commit efabec0bc085df66eaeded1acc2994abf5b26e71 Author: Misha Dmitriev Date: Mon Dec 18 13:12:57 2017 -0800 HIVE-17684: Map Join memory exhaustion check broken in Hive on Spark. diff --git a/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java new file mode 100644 index 0000000000000000000000000000000000000000..edba6f9ad658e283b7fb059fdbd3524c33b19c6a --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Based on org.apache.hadoop.util.GcTimeMonitor. However, this class detects + * GC pauses using the same method as JvmPauseMonitor (by comparing the actual + * and expected thread sleep time) rather than by reading information from + * GarbageCollectionMXBean. The latter may sometimes report time spent in + * concurrent GC operations rather than GC pauses. This may result in inaccurate + * results when trying to estimate the time that the JVM is "frozen" due to GC. + * + * This class monitors the percentage of time the JVM is paused in GC within + * the specified observation window, say 1 minute. The user can provide a + * hook which will be called whenever this percentage exceeds the specified + * threshold. + */ +public class GcTimeMonitor extends Thread { + + private final long maxGcTimePercentage; + private final long observationWindowNanos, sleepIntervalMs; + private final GcTimeAlertHandler alertHandler; + + // Ring buffers containing GC timings and timestamps when timings were taken + private final TsAndData[] gcDataBuf; + private int bufSize, startIdx, endIdx; + + private long startTimeNanos; + private final GcData curData = new GcData(); + private volatile boolean shouldRun = true; + + /** + * Create an instance of GCTimeMonitor. Once it's started, it will stay alive + * and monitor GC time percentage until shutdown() is called. If you don't + * put a limit on the number of GCTimeMonitor instances that you create, and + * alertHandler != null, you should necessarily call shutdown() once the given + * instance is not needed. Otherwise, you may create a memory leak, because + * each running GCTimeMonitor will keep its alertHandler object in memory, + * which in turn may reference and keep in memory many more other objects. + * + * @param observationWindowMs the interval over which the percentage + * of GC time should be calculated. A practical value would be somewhere + * between 30 sec and several minutes. + * @param sleepIntervalMs how frequently this thread should wake up to check + * GC timings. This is also a frequency with which alertHandler will be + * invoked if GC time percentage exceeds the specified limit. A practical + * value would likely be 500..1000 ms. + * @param maxGcTimePercentage A GC time percentage limit (0..100) within + * observationWindowMs. Once this is exceeded, alertHandler will be + * invoked every sleepIntervalMs milliseconds until GC time percentage + * falls below this limit. + * @param alertHandler a single method in this interface is invoked when GC + * time percentage exceeds the specified limit. + */ + public GcTimeMonitor(long observationWindowMs, long sleepIntervalMs, + int maxGcTimePercentage, GcTimeAlertHandler alertHandler) { + Preconditions.checkArgument(observationWindowMs > 0); + Preconditions.checkArgument( + sleepIntervalMs > 0 && sleepIntervalMs < observationWindowMs); + Preconditions.checkArgument( + maxGcTimePercentage >= 0 && maxGcTimePercentage <= 100); + + this.observationWindowNanos = observationWindowMs * 1000000; + this.sleepIntervalMs = sleepIntervalMs; + this.maxGcTimePercentage = maxGcTimePercentage; + this.alertHandler = alertHandler; + + bufSize = (int) (observationWindowMs / sleepIntervalMs + 2); + // Prevent the user from accidentally creating an abnormally big buffer, + // which will result in slow calculations and likely inaccuracy. + Preconditions.checkArgument(bufSize <= 128 * 1024); + gcDataBuf = new TsAndData[bufSize]; + for (int i = 0; i < bufSize; i++) { + gcDataBuf[i] = new TsAndData(); + } + + this.setDaemon(true); + this.setName("GcTimeMonitor obsWindow = " + observationWindowMs + + ", sleepInterval = " + sleepIntervalMs + + ", maxGcTimePerc = " + maxGcTimePercentage); + } + + @Override + public void run() { + startTimeNanos = System.nanoTime(); + gcDataBuf[startIdx].setValues(startTimeNanos, 0); + + while (shouldRun) { + long intervalStartTsNanos = System.nanoTime(); + try { + Thread.sleep(sleepIntervalMs); + } catch (InterruptedException ie) { + return; + } + long intervalEndTsNanos = System.nanoTime(); + + calculateGCTimePercentageWithinObservedInterval(intervalStartTsNanos, intervalEndTsNanos); + if (alertHandler != null && + curData.gcTimePercentage > maxGcTimePercentage) { + alertHandler.alert(curData.clone()); + } + } + } + + public void shutdown() { + shouldRun = false; + } + + /** Returns a copy of the most recent data measured by this monitor. */ + public GcData getLatestGcData() { + return curData.clone(); + } + + private void calculateGCTimePercentageWithinObservedInterval( + long intervalStartTsNanos, long intervalEndTsNanos) { + long gcTimeWithinSleepIntervalNanos = + intervalEndTsNanos - intervalStartTsNanos - sleepIntervalMs * 1000000; + long totalGcTimeNanos = curData.totalGcTimeNanos + gcTimeWithinSleepIntervalNanos; + + long gcMonitorRunTimeNanos = intervalEndTsNanos - startTimeNanos; + + endIdx = (endIdx + 1) % bufSize; + gcDataBuf[endIdx].setValues(intervalEndTsNanos, gcTimeWithinSleepIntervalNanos); + + // Update the observation window so that it covers the last observationWindowNanos + // period. For that, move startIdx forward until we reach the first buffer entry with + // timestamp within the observation window. + long startObsWindowTsNanos = intervalEndTsNanos - observationWindowNanos; + while (gcDataBuf[startIdx].tsNanos < startObsWindowTsNanos && startIdx != endIdx) { + startIdx = (startIdx + 1) % bufSize; + } + + // Calculate total GC time within observationWindowMs. + // We should be careful about GC time that passed before the first timestamp + // in our observation window. + long gcTimeWithinObservationWindowNanos = Math.min( + gcDataBuf[startIdx].gcPauseNanos, gcDataBuf[startIdx].tsNanos - startObsWindowTsNanos); + if (startIdx != endIdx) { + for (int i = (startIdx + 1) % bufSize; i != endIdx; + i = (i + 1) % bufSize) { + gcTimeWithinObservationWindowNanos += gcDataBuf[i].gcPauseNanos; + } + } + + curData.update(gcMonitorRunTimeNanos, totalGcTimeNanos, + (int) (gcTimeWithinObservationWindowNanos * 100 / + Math.min(observationWindowNanos, gcMonitorRunTimeNanos))); + } + + /** + * The user can provide an instance of a class implementing this interface + * when initializing a GcTimeMonitor to receive alerts when GC time + * percentage exceeds the specified threshold. + */ + public interface GcTimeAlertHandler { + void alert(GcData gcData); + } + + /** Encapsulates data about GC pauses measured at the specific timestamp. */ + public static class GcData implements Cloneable { + private long gcMonitorRunTimeNanos, totalGcTimeNanos; + private int gcTimePercentage; + + /** Returns the time since the start of the associated GcTimeMonitor. */ + public long getGcMonitorRunTimeMs() { + return gcMonitorRunTimeNanos / 1000000; + } + + /** Returns accumulated GC time since this JVM started. */ + public long getAccumulatedGcTimeMs() { + return totalGcTimeNanos / 1000000; + } + + /** + * Returns the percentage (0..100) of time that the JVM spent in GC pauses + * within the observation window of the associated GcTimeMonitor. + */ + public int getGcTimePercentage() { + return gcTimePercentage; + } + + private synchronized void update(long gcMonitorRunTimeNanos, + long totalGcTimeNanos, int inGcTimePercentage) { + this.gcMonitorRunTimeNanos = gcMonitorRunTimeNanos; + this.totalGcTimeNanos = totalGcTimeNanos; + this.gcTimePercentage = inGcTimePercentage; + } + + @Override + public synchronized GcData clone() { + try { + return (GcData) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } + } + + private static class TsAndData { + private long tsNanos; // Timestamp when this measurement was taken + private long gcPauseNanos; // Total GC pause time within the interval between ts + // and the timestamp of the previous measurement. + + void setValues(long tsNanos, long gcPauseNanos) { + this.tsNanos = tsNanos; + this.gcPauseNanos = gcPauseNanos; + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + * This main function just leaks memory. Running this class will quickly + * result in a "GC hell" and subsequent alerts from the GcTimeMonitor. + */ + public static void main(String []args) throws Exception { + new GcTimeMonitor(20 * 1000, 500, 20, + new GcTimeMonitor.GcTimeAlertHandler() { + @Override + public void alert(GcData gcData) { + System.err.println( + "GcTimeMonitor alert. Current GC time percentage = " + + gcData.getGcTimePercentage() + + ", total run time = " + (gcData.getGcMonitorRunTimeMs() / 1000) + " sec" + + ", total GC time = " + (gcData.getAccumulatedGcTimeMs() / 1000) + " sec"); + } + }).start(); + + List list = Lists.newArrayList(); + for (int i = 0; ; i++) { + list.add("This is a long string to fill memory quickly " + i); + if (i % 100000 == 0) { + System.out.println("Added " + i + " strings"); + Thread.sleep(100); + } + } + } + +} + diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 40ea3ac0c5cd0943a4e9dbe2b0e8b952070a8a67..7e4d2686746271b2ccc552ebde4e7357efa6223b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2052,6 +2052,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "It means the data of the small table is too large to be held in memory."), HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000, "The number means after how many rows processed it needs to check the memory usage"), + HIVEHASHTABLEMAXGCTIMEPERCENTAGE("hive.mapjoin.max.gc.time.percentage", 60, + "This number means how much time (what percentage, 0..100, of wallclock time) \n" + + "the JVM is allowed to spend in garbage collection when running the local task. \n" + + "If GC time percentage exceeds this number, the local task will abort by itself."), HIVEDEBUGLOCALTASK("hive.debug.localtask",false, ""), diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml index 0c3adb4b0f96242f2ab32c815177ddf3caf103fc..7c645adc031aada36d96946a0933f3a102a9f2b1 100644 --- a/data/conf/hive-site.xml +++ b/data/conf/hive-site.xml @@ -204,6 +204,16 @@ + hive.mapjoin.max.gc.time.percentage + 99 + + Maximum percentage of wallclock time that the JVM can spend in GC. + If this limit is exceeded, the local task will abort by itself. + Tests may run in very stressed environment, so this number is set very high to avoid false negatives. + + + + hive.input.format org.apache.hadoop.hive.ql.io.CombineHiveInputFormat The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat. diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml index 79e388ec06fdc9fe31814a488b05daee5b1b782d..317631c16ec24aaff2221653e1f0c580226e4b73 100644 --- a/data/conf/spark/standalone/hive-site.xml +++ b/data/conf/spark/standalone/hive-site.xml @@ -236,7 +236,12 @@ spark.driver.extraClassPath - ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar + ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar:${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar + + + + spark.executor.extraClassPath + ${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar diff --git a/pom.xml b/pom.xml index 9c15328eccc2c975e536892330e8a218d46fecbb..5e100fd45b1f8a6d2194cd9e9ed70b0d27f6d0e5 100644 --- a/pom.xml +++ b/pom.xml @@ -1329,6 +1329,7 @@ ${test.conf.dir}/krb5.conf ${antlr.version} + ${hadoop.version} ${qfile} ${initScript} ${clustermode} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index f01b67e17dcdabc2f0de9a421c58788eee5b8969..73592b9edb2ad2ec3c066e31909d0b0543255b38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; @@ -103,7 +102,6 @@ private long rowNumber = 0; protected transient LogHelper console; private long hashTableScale; - private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; /** Kryo ctor. */ protected HashTableSinkOperator() { @@ -126,7 +124,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); - memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first @@ -230,6 +227,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { */ @Override public void process(Object row, int tag) throws HiveException { + checkGcOverhead(); + byte alias = (byte)tag; // compute keys and values as StandardObjects. Use non-optimized key (MR). Object[] currentKey = new Object[joinKeys[alias].size()]; @@ -255,9 +254,6 @@ public void process(Object row, int tag) throws HiveException { rowContainer = emptyRowContainer; } rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); - } tableContainer.put(key, rowContainer); } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 38316bf7fa0d8d5e0704160cb750b9c806cdc147..1223e993ce84ac714e8dc1c418f714d424db28f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -35,9 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.GcTimeMonitor; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; @@ -126,6 +130,33 @@ private transient boolean multiChildren; private transient int[] selected; + // GC time monitoring + static class HiveGcTimeMonitor extends GcTimeMonitor { + private final Logger log; + + HiveGcTimeMonitor(int criticalGcTimePercentage, Logger log) { + super(45 * 1000, 500, criticalGcTimePercentage, + new GcTimeMonitor.GcTimeAlertHandler() { + @Override + public void alert(GcTimeMonitor.GcData gcData) { + lastAlertGcTimePercentage = gcData.getGcTimePercentage(); + log.warn("GcTimeMonitor alert called. Current GC time % = " + + lastAlertGcTimePercentage); + } + }); + this.log = log; + } + } + + // The GC time alert functionality below is used by the checkGcOverhead() method. + // This method may be called very frequently, and if it called + // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary + // overhead due to synchronization and new object creation. So instead, + // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage, + // and checkGcOverhead() may check it as frequently as needed. + protected static volatile int lastAlertGcTimePercentage, criticalGcTimePercentage; + private static volatile GcTimeMonitor hiveGcTimeMonitor; + // dummy operator (for not increasing seqId) protected Operator(String name, CompilationOpContext cContext) { this(); @@ -395,6 +426,13 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) LOG.debug("Initialization Done " + id + " " + getName() + " done is reset."); } + if (hiveGcTimeMonitor == null) { + criticalGcTimePercentage = HiveConf.getIntVar( + hconf, ConfVars.HIVEHASHTABLEMAXGCTIMEPERCENTAGE); + hiveGcTimeMonitor = new HiveGcTimeMonitor(criticalGcTimePercentage, LOG); + hiveGcTimeMonitor.start(); + } + // let's wait on the async ops before continuing completeInitialization(asyncInitOperations); } @@ -1619,4 +1657,12 @@ public void setBucketingVersion(int bucketingVersion) { public int getBucketingVersion() { return bucketingVersion; } + + public static void checkGcOverhead() { + if (lastAlertGcTimePercentage >= criticalGcTimePercentage) { + String msg = "GC time percentage = " + lastAlertGcTimePercentage + + ", exceeded threshold."; + throw new MapJoinMemoryExhaustionError(msg); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 4bc7568f9df8b1726fd5b17dd74e328544c150db..fc7a7dacfa242528043283be44b55c2baa0ee28b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -399,7 +399,8 @@ public int executeInProcess(DriverContext driverContext) { String message; if (throwable instanceof OutOfMemoryError || (throwable instanceof MapJoinMemoryExhaustionError)) { - message = "Hive Runtime Error: Map local work exhausted memory"; + message = "Hive Runtime Error: Map local work exhausted memory. " + + "Details: " + throwable; retVal = 3; } else { message = "Hive Runtime Error: Map local work failed";