commit 909035ce7804c8e24817b2b4ea9642dc94df5ed2 Author: Misha Dmitriev Date: Fri Sep 14 18:10:00 2018 -0700 HIVE-17684: Map Join memory exhaustion check broken in Hive on Spark. diff --git a/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java new file mode 100644 index 0000000000000000000000000000000000000000..edba6f9ad658e283b7fb059fdbd3524c33b19c6a --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Based on org.apache.hadoop.util.GcTimeMonitor. However, this class detects + * GC pauses using the same method as JvmPauseMonitor (by comparing the actual + * and expected thread sleep time) rather than by reading information from + * GarbageCollectionMXBean. The latter may sometimes report time spent in + * concurrent GC operations rather than GC pauses. This may result in inaccurate + * results when trying to estimate the time that the JVM is "frozen" due to GC. + * + * This class monitors the percentage of time the JVM is paused in GC within + * the specified observation window, say 1 minute. The user can provide a + * hook which will be called whenever this percentage exceeds the specified + * threshold. + */ +public class GcTimeMonitor extends Thread { + + private final long maxGcTimePercentage; + private final long observationWindowNanos, sleepIntervalMs; + private final GcTimeAlertHandler alertHandler; + + // Ring buffers containing GC timings and timestamps when timings were taken + private final TsAndData[] gcDataBuf; + private int bufSize, startIdx, endIdx; + + private long startTimeNanos; + private final GcData curData = new GcData(); + private volatile boolean shouldRun = true; + + /** + * Create an instance of GCTimeMonitor. Once it's started, it will stay alive + * and monitor GC time percentage until shutdown() is called. If you don't + * put a limit on the number of GCTimeMonitor instances that you create, and + * alertHandler != null, you should necessarily call shutdown() once the given + * instance is not needed. Otherwise, you may create a memory leak, because + * each running GCTimeMonitor will keep its alertHandler object in memory, + * which in turn may reference and keep in memory many more other objects. + * + * @param observationWindowMs the interval over which the percentage + * of GC time should be calculated. A practical value would be somewhere + * between 30 sec and several minutes. + * @param sleepIntervalMs how frequently this thread should wake up to check + * GC timings. This is also a frequency with which alertHandler will be + * invoked if GC time percentage exceeds the specified limit. A practical + * value would likely be 500..1000 ms. + * @param maxGcTimePercentage A GC time percentage limit (0..100) within + * observationWindowMs. Once this is exceeded, alertHandler will be + * invoked every sleepIntervalMs milliseconds until GC time percentage + * falls below this limit. + * @param alertHandler a single method in this interface is invoked when GC + * time percentage exceeds the specified limit. + */ + public GcTimeMonitor(long observationWindowMs, long sleepIntervalMs, + int maxGcTimePercentage, GcTimeAlertHandler alertHandler) { + Preconditions.checkArgument(observationWindowMs > 0); + Preconditions.checkArgument( + sleepIntervalMs > 0 && sleepIntervalMs < observationWindowMs); + Preconditions.checkArgument( + maxGcTimePercentage >= 0 && maxGcTimePercentage <= 100); + + this.observationWindowNanos = observationWindowMs * 1000000; + this.sleepIntervalMs = sleepIntervalMs; + this.maxGcTimePercentage = maxGcTimePercentage; + this.alertHandler = alertHandler; + + bufSize = (int) (observationWindowMs / sleepIntervalMs + 2); + // Prevent the user from accidentally creating an abnormally big buffer, + // which will result in slow calculations and likely inaccuracy. + Preconditions.checkArgument(bufSize <= 128 * 1024); + gcDataBuf = new TsAndData[bufSize]; + for (int i = 0; i < bufSize; i++) { + gcDataBuf[i] = new TsAndData(); + } + + this.setDaemon(true); + this.setName("GcTimeMonitor obsWindow = " + observationWindowMs + + ", sleepInterval = " + sleepIntervalMs + + ", maxGcTimePerc = " + maxGcTimePercentage); + } + + @Override + public void run() { + startTimeNanos = System.nanoTime(); + gcDataBuf[startIdx].setValues(startTimeNanos, 0); + + while (shouldRun) { + long intervalStartTsNanos = System.nanoTime(); + try { + Thread.sleep(sleepIntervalMs); + } catch (InterruptedException ie) { + return; + } + long intervalEndTsNanos = System.nanoTime(); + + calculateGCTimePercentageWithinObservedInterval(intervalStartTsNanos, intervalEndTsNanos); + if (alertHandler != null && + curData.gcTimePercentage > maxGcTimePercentage) { + alertHandler.alert(curData.clone()); + } + } + } + + public void shutdown() { + shouldRun = false; + } + + /** Returns a copy of the most recent data measured by this monitor. */ + public GcData getLatestGcData() { + return curData.clone(); + } + + private void calculateGCTimePercentageWithinObservedInterval( + long intervalStartTsNanos, long intervalEndTsNanos) { + long gcTimeWithinSleepIntervalNanos = + intervalEndTsNanos - intervalStartTsNanos - sleepIntervalMs * 1000000; + long totalGcTimeNanos = curData.totalGcTimeNanos + gcTimeWithinSleepIntervalNanos; + + long gcMonitorRunTimeNanos = intervalEndTsNanos - startTimeNanos; + + endIdx = (endIdx + 1) % bufSize; + gcDataBuf[endIdx].setValues(intervalEndTsNanos, gcTimeWithinSleepIntervalNanos); + + // Update the observation window so that it covers the last observationWindowNanos + // period. For that, move startIdx forward until we reach the first buffer entry with + // timestamp within the observation window. + long startObsWindowTsNanos = intervalEndTsNanos - observationWindowNanos; + while (gcDataBuf[startIdx].tsNanos < startObsWindowTsNanos && startIdx != endIdx) { + startIdx = (startIdx + 1) % bufSize; + } + + // Calculate total GC time within observationWindowMs. + // We should be careful about GC time that passed before the first timestamp + // in our observation window. + long gcTimeWithinObservationWindowNanos = Math.min( + gcDataBuf[startIdx].gcPauseNanos, gcDataBuf[startIdx].tsNanos - startObsWindowTsNanos); + if (startIdx != endIdx) { + for (int i = (startIdx + 1) % bufSize; i != endIdx; + i = (i + 1) % bufSize) { + gcTimeWithinObservationWindowNanos += gcDataBuf[i].gcPauseNanos; + } + } + + curData.update(gcMonitorRunTimeNanos, totalGcTimeNanos, + (int) (gcTimeWithinObservationWindowNanos * 100 / + Math.min(observationWindowNanos, gcMonitorRunTimeNanos))); + } + + /** + * The user can provide an instance of a class implementing this interface + * when initializing a GcTimeMonitor to receive alerts when GC time + * percentage exceeds the specified threshold. + */ + public interface GcTimeAlertHandler { + void alert(GcData gcData); + } + + /** Encapsulates data about GC pauses measured at the specific timestamp. */ + public static class GcData implements Cloneable { + private long gcMonitorRunTimeNanos, totalGcTimeNanos; + private int gcTimePercentage; + + /** Returns the time since the start of the associated GcTimeMonitor. */ + public long getGcMonitorRunTimeMs() { + return gcMonitorRunTimeNanos / 1000000; + } + + /** Returns accumulated GC time since this JVM started. */ + public long getAccumulatedGcTimeMs() { + return totalGcTimeNanos / 1000000; + } + + /** + * Returns the percentage (0..100) of time that the JVM spent in GC pauses + * within the observation window of the associated GcTimeMonitor. + */ + public int getGcTimePercentage() { + return gcTimePercentage; + } + + private synchronized void update(long gcMonitorRunTimeNanos, + long totalGcTimeNanos, int inGcTimePercentage) { + this.gcMonitorRunTimeNanos = gcMonitorRunTimeNanos; + this.totalGcTimeNanos = totalGcTimeNanos; + this.gcTimePercentage = inGcTimePercentage; + } + + @Override + public synchronized GcData clone() { + try { + return (GcData) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } + } + + private static class TsAndData { + private long tsNanos; // Timestamp when this measurement was taken + private long gcPauseNanos; // Total GC pause time within the interval between ts + // and the timestamp of the previous measurement. + + void setValues(long tsNanos, long gcPauseNanos) { + this.tsNanos = tsNanos; + this.gcPauseNanos = gcPauseNanos; + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + * This main function just leaks memory. Running this class will quickly + * result in a "GC hell" and subsequent alerts from the GcTimeMonitor. + */ + public static void main(String []args) throws Exception { + new GcTimeMonitor(20 * 1000, 500, 20, + new GcTimeMonitor.GcTimeAlertHandler() { + @Override + public void alert(GcData gcData) { + System.err.println( + "GcTimeMonitor alert. Current GC time percentage = " + + gcData.getGcTimePercentage() + + ", total run time = " + (gcData.getGcMonitorRunTimeMs() / 1000) + " sec" + + ", total GC time = " + (gcData.getAccumulatedGcTimeMs() / 1000) + " sec"); + } + }).start(); + + List list = Lists.newArrayList(); + for (int i = 0; ; i++) { + list.add("This is a long string to fill memory quickly " + i); + if (i % 100000 == 0) { + System.out.println("Added " + i + " strings"); + Thread.sleep(100); + } + } + } + +} + diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a561e5771ff83b040111f8313233d5f291d2bbe..a6364969fffe3aab20a09b749e39c5a576e85260 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2064,13 +2064,21 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage", (float) 0.55, "This number means how much memory the local task can take to hold the key/value into an in-memory hash table \n" + "when this map join is followed by a group by. If the local task's memory usage is more than this number, \n" + - "the local task will abort by itself. It means the data of the small table is too large to be held in memory."), + "the local task will abort by itself. It means the data of the small table is too large " + + "to be held in memory. Does not apply to Hive-on-Spark (replaced by " + + "hive.mapjoin.max.gc.time.percentage)"), HIVEHASHTABLEMAXMEMORYUSAGE("hive.mapjoin.localtask.max.memory.usage", (float) 0.90, "This number means how much memory the local task can take to hold the key/value into an in-memory hash table. \n" + "If the local task's memory usage is more than this number, the local task will abort by itself. \n" + - "It means the data of the small table is too large to be held in memory."), + "It means the data of the small table is too large to be held in memory. Does not apply to " + + "Hive-on-Spark (replaced by hive.mapjoin.max.gc.time.percentage)"), HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000, "The number means after how many rows processed it needs to check the memory usage"), + HIVEHASHTABLEMAXGCTIMEPERCENTAGE("hive.mapjoin.max.gc.time.percentage", (float) 0.60, + new RangeValidator(0.0f, 1.0f), "This number means how much time (what percentage, " + + "0..1, of wallclock time) the JVM is allowed to spend in garbage collection when running " + + "the local task. If GC time percentage exceeds this number, the local task will abort by " + + "itself. Applies to Hive-on-Spark only"), HIVEDEBUGLOCALTASK("hive.debug.localtask",false, ""), diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml index 0c3adb4b0f96242f2ab32c815177ddf3caf103fc..0daf9adc717bc1c4413d2e34691c26a3e2585c77 100644 --- a/data/conf/hive-site.xml +++ b/data/conf/hive-site.xml @@ -204,6 +204,16 @@ + hive.mapjoin.max.gc.time.percentage + 0.99 + + Maximum percentage of wallclock time that the JVM can spend in GC. + If this limit is exceeded, the local task will abort by itself. + Tests may run in very stressed environment, so this number is set very high to avoid false negatives. + + + + hive.input.format org.apache.hadoop.hive.ql.io.CombineHiveInputFormat The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat. diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml index 79e388ec06fdc9fe31814a488b05daee5b1b782d..317631c16ec24aaff2221653e1f0c580226e4b73 100644 --- a/data/conf/spark/standalone/hive-site.xml +++ b/data/conf/spark/standalone/hive-site.xml @@ -236,7 +236,12 @@ spark.driver.extraClassPath - ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar + ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar:${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar + + + + spark.executor.extraClassPath + ${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar diff --git a/pom.xml b/pom.xml index 9c15328eccc2c975e536892330e8a218d46fecbb..5e100fd45b1f8a6d2194cd9e9ed70b0d27f6d0e5 100644 --- a/pom.xml +++ b/pom.xml @@ -1329,6 +1329,7 @@ ${test.conf.dir}/krb5.conf ${antlr.version} + ${hadoop.version} ${qfile} ${initScript} ${clustermode} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java new file mode 100644 index 0000000000000000000000000000000000000000..b50a66efcf1fb1d8ea55709a9803a4718d58c339 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; +import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; +import org.apache.hadoop.hive.ql.session.SessionState; + +/** + * A {@link MemoryExhaustionChecker} that uses a {@link MapJoinMemoryExhaustionHandler} + * to check memory overhead. + */ +class DefaultMemoryExhaustionChecker implements MemoryExhaustionChecker { + + private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; + + DefaultMemoryExhaustionChecker(SessionState.LogHelper console, + HashTableSinkDesc hashTableSinkDesc) { + super(); + this.memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, + hashTableSinkDesc.getHashtableMemoryUsage()); + } + + @Override + public void checkMemoryOverhead(long rowNumber, long hashTableScale, int tableContainerSize) { + if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { + this.memoryExhaustionHandler.checkMemoryStatus(tableContainerSize, rowNumber); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index f01b67e17dcdabc2f0de9a421c58788eee5b8969..36c93350d5419406acba3dae6d47b53c3d58cc1d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; @@ -103,7 +102,7 @@ private long rowNumber = 0; protected transient LogHelper console; private long hashTableScale; - private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; + private MemoryExhaustionChecker memoryExhaustionChecker; /** Kryo ctor. */ protected HashTableSinkOperator() { @@ -126,7 +125,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); - memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); + memoryExhaustionChecker = MemoryExhaustionCheckerFactory.getChecker(console, hconf, conf); emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first @@ -255,9 +254,7 @@ public void process(Object row, int tag) throws HiveException { rowContainer = emptyRowContainer; } rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); - } + memoryExhaustionChecker.checkMemoryOverhead(rowNumber, hashTableScale, tableContainer.size()); tableContainer.put(key, rowContainer); } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java new file mode 100644 index 0000000000000000000000000000000000000000..7500430d59bf35f50c0447deeed45342c12c9652 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +/** + * Checks the memory overhead when running {@link HashTableSinkOperator}. Throws a + * {@link org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError} + * if too much memory is being used. + */ +interface MemoryExhaustionChecker { + + void checkMemoryOverhead(long rowNumber, long hashTableScale, int tableContainerSize); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..086df68c8d448ed8264d568687f7cd7efc12a576 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; +import org.apache.hadoop.hive.ql.session.SessionState; + +class MemoryExhaustionCheckerFactory { + + private MemoryExhaustionCheckerFactory() { + // No default constructor allowed + } + + static MemoryExhaustionChecker getChecker(SessionState.LogHelper console, Configuration conf, + HashTableSinkDesc hashTableSinkDesc) { + if ("spark".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + return SparkMemoryExhaustionChecker.get(conf); + } else { + return new DefaultMemoryExhaustionChecker(console, hashTableSinkDesc); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java new file mode 100644 index 0000000000000000000000000000000000000000..94057ae0c373f6a318fc410ece7f54ca04d6b923 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.GcTimeMonitor; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link MemoryExhaustionChecker} specific to Hive-on-Spark. Unlike the + * {@link DefaultMemoryExhaustionChecker} it uses a {@link GcTimeMonitor} + * to monitor how much time (what percentage of run time within the last + * minute or so) is spent in GC. If this value exceeds the configured value + * in {@link HiveConf.ConfVars#HIVEHASHTABLEMAXGCTIMEPERCENTAGE}, a + * {@link MapJoinMemoryExhaustionError} is thrown. + */ +class SparkMemoryExhaustionChecker implements MemoryExhaustionChecker { + + private static final Logger LOG = LoggerFactory.getLogger(SparkMemoryExhaustionChecker.class); + + private static SparkMemoryExhaustionChecker INSTANCE; + + // The GC time alert functionality below is used by the checkGcOverhead() method. + // This method may be called very frequently, and if it called + // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary + // overhead due to synchronization and new object creation. So instead, + // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage, + // and checkGcOverhead() may check it as frequently as needed. + private volatile int lastAlertGcTimePercentage; + private final int criticalGcTimePercentage; + + private SparkMemoryExhaustionChecker(Configuration conf) { + super(); + criticalGcTimePercentage = (int) (HiveConf.getFloatVar( + conf, HiveConf.ConfVars.HIVEHASHTABLEMAXGCTIMEPERCENTAGE) * 100); + GcTimeMonitor hiveGcTimeMonitor = new HiveGcTimeMonitor(criticalGcTimePercentage, LOG); + hiveGcTimeMonitor.start(); + } + + static synchronized SparkMemoryExhaustionChecker get(Configuration conf) { + if (INSTANCE == null) { + INSTANCE = new SparkMemoryExhaustionChecker(conf); + } + return INSTANCE; + } + + @Override + public void checkMemoryOverhead(long rowNumber, long hashTableScale, int tableContainerSize) { + if (lastAlertGcTimePercentage >= criticalGcTimePercentage) { + String msg = "GC time percentage = " + lastAlertGcTimePercentage + "% exceeded threshold " + + criticalGcTimePercentage + "%"; + throw new MapJoinMemoryExhaustionError(msg); + } + } + + // GC time monitoring + class HiveGcTimeMonitor extends GcTimeMonitor { + + HiveGcTimeMonitor(int criticalGcTimePercentage, Logger log) { + super(45 * 1000, 200, criticalGcTimePercentage, gcData -> { + lastAlertGcTimePercentage = gcData.getGcTimePercentage(); + log.warn("GcTimeMonitor alert called. Current GC time = " + + lastAlertGcTimePercentage + "%"); + }); + } + } +}