diff --git a/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java new file mode 100644 index 0000000000..edba6f9ad6 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Based on org.apache.hadoop.util.GcTimeMonitor. However, this class detects + * GC pauses using the same method as JvmPauseMonitor (by comparing the actual + * and expected thread sleep time) rather than by reading information from + * GarbageCollectionMXBean. The latter may sometimes report time spent in + * concurrent GC operations rather than GC pauses. This may result in inaccurate + * results when trying to estimate the time that the JVM is "frozen" due to GC. + * + * This class monitors the percentage of time the JVM is paused in GC within + * the specified observation window, say 1 minute. The user can provide a + * hook which will be called whenever this percentage exceeds the specified + * threshold. + */ +public class GcTimeMonitor extends Thread { + + private final long maxGcTimePercentage; + private final long observationWindowNanos, sleepIntervalMs; + private final GcTimeAlertHandler alertHandler; + + // Ring buffers containing GC timings and timestamps when timings were taken + private final TsAndData[] gcDataBuf; + private int bufSize, startIdx, endIdx; + + private long startTimeNanos; + private final GcData curData = new GcData(); + private volatile boolean shouldRun = true; + + /** + * Create an instance of GCTimeMonitor. Once it's started, it will stay alive + * and monitor GC time percentage until shutdown() is called. If you don't + * put a limit on the number of GCTimeMonitor instances that you create, and + * alertHandler != null, you should necessarily call shutdown() once the given + * instance is not needed. Otherwise, you may create a memory leak, because + * each running GCTimeMonitor will keep its alertHandler object in memory, + * which in turn may reference and keep in memory many more other objects. + * + * @param observationWindowMs the interval over which the percentage + * of GC time should be calculated. A practical value would be somewhere + * between 30 sec and several minutes. + * @param sleepIntervalMs how frequently this thread should wake up to check + * GC timings. This is also a frequency with which alertHandler will be + * invoked if GC time percentage exceeds the specified limit. A practical + * value would likely be 500..1000 ms. + * @param maxGcTimePercentage A GC time percentage limit (0..100) within + * observationWindowMs. Once this is exceeded, alertHandler will be + * invoked every sleepIntervalMs milliseconds until GC time percentage + * falls below this limit. + * @param alertHandler a single method in this interface is invoked when GC + * time percentage exceeds the specified limit. + */ + public GcTimeMonitor(long observationWindowMs, long sleepIntervalMs, + int maxGcTimePercentage, GcTimeAlertHandler alertHandler) { + Preconditions.checkArgument(observationWindowMs > 0); + Preconditions.checkArgument( + sleepIntervalMs > 0 && sleepIntervalMs < observationWindowMs); + Preconditions.checkArgument( + maxGcTimePercentage >= 0 && maxGcTimePercentage <= 100); + + this.observationWindowNanos = observationWindowMs * 1000000; + this.sleepIntervalMs = sleepIntervalMs; + this.maxGcTimePercentage = maxGcTimePercentage; + this.alertHandler = alertHandler; + + bufSize = (int) (observationWindowMs / sleepIntervalMs + 2); + // Prevent the user from accidentally creating an abnormally big buffer, + // which will result in slow calculations and likely inaccuracy. + Preconditions.checkArgument(bufSize <= 128 * 1024); + gcDataBuf = new TsAndData[bufSize]; + for (int i = 0; i < bufSize; i++) { + gcDataBuf[i] = new TsAndData(); + } + + this.setDaemon(true); + this.setName("GcTimeMonitor obsWindow = " + observationWindowMs + + ", sleepInterval = " + sleepIntervalMs + + ", maxGcTimePerc = " + maxGcTimePercentage); + } + + @Override + public void run() { + startTimeNanos = System.nanoTime(); + gcDataBuf[startIdx].setValues(startTimeNanos, 0); + + while (shouldRun) { + long intervalStartTsNanos = System.nanoTime(); + try { + Thread.sleep(sleepIntervalMs); + } catch (InterruptedException ie) { + return; + } + long intervalEndTsNanos = System.nanoTime(); + + calculateGCTimePercentageWithinObservedInterval(intervalStartTsNanos, intervalEndTsNanos); + if (alertHandler != null && + curData.gcTimePercentage > maxGcTimePercentage) { + alertHandler.alert(curData.clone()); + } + } + } + + public void shutdown() { + shouldRun = false; + } + + /** Returns a copy of the most recent data measured by this monitor. */ + public GcData getLatestGcData() { + return curData.clone(); + } + + private void calculateGCTimePercentageWithinObservedInterval( + long intervalStartTsNanos, long intervalEndTsNanos) { + long gcTimeWithinSleepIntervalNanos = + intervalEndTsNanos - intervalStartTsNanos - sleepIntervalMs * 1000000; + long totalGcTimeNanos = curData.totalGcTimeNanos + gcTimeWithinSleepIntervalNanos; + + long gcMonitorRunTimeNanos = intervalEndTsNanos - startTimeNanos; + + endIdx = (endIdx + 1) % bufSize; + gcDataBuf[endIdx].setValues(intervalEndTsNanos, gcTimeWithinSleepIntervalNanos); + + // Update the observation window so that it covers the last observationWindowNanos + // period. For that, move startIdx forward until we reach the first buffer entry with + // timestamp within the observation window. + long startObsWindowTsNanos = intervalEndTsNanos - observationWindowNanos; + while (gcDataBuf[startIdx].tsNanos < startObsWindowTsNanos && startIdx != endIdx) { + startIdx = (startIdx + 1) % bufSize; + } + + // Calculate total GC time within observationWindowMs. + // We should be careful about GC time that passed before the first timestamp + // in our observation window. + long gcTimeWithinObservationWindowNanos = Math.min( + gcDataBuf[startIdx].gcPauseNanos, gcDataBuf[startIdx].tsNanos - startObsWindowTsNanos); + if (startIdx != endIdx) { + for (int i = (startIdx + 1) % bufSize; i != endIdx; + i = (i + 1) % bufSize) { + gcTimeWithinObservationWindowNanos += gcDataBuf[i].gcPauseNanos; + } + } + + curData.update(gcMonitorRunTimeNanos, totalGcTimeNanos, + (int) (gcTimeWithinObservationWindowNanos * 100 / + Math.min(observationWindowNanos, gcMonitorRunTimeNanos))); + } + + /** + * The user can provide an instance of a class implementing this interface + * when initializing a GcTimeMonitor to receive alerts when GC time + * percentage exceeds the specified threshold. + */ + public interface GcTimeAlertHandler { + void alert(GcData gcData); + } + + /** Encapsulates data about GC pauses measured at the specific timestamp. */ + public static class GcData implements Cloneable { + private long gcMonitorRunTimeNanos, totalGcTimeNanos; + private int gcTimePercentage; + + /** Returns the time since the start of the associated GcTimeMonitor. */ + public long getGcMonitorRunTimeMs() { + return gcMonitorRunTimeNanos / 1000000; + } + + /** Returns accumulated GC time since this JVM started. */ + public long getAccumulatedGcTimeMs() { + return totalGcTimeNanos / 1000000; + } + + /** + * Returns the percentage (0..100) of time that the JVM spent in GC pauses + * within the observation window of the associated GcTimeMonitor. + */ + public int getGcTimePercentage() { + return gcTimePercentage; + } + + private synchronized void update(long gcMonitorRunTimeNanos, + long totalGcTimeNanos, int inGcTimePercentage) { + this.gcMonitorRunTimeNanos = gcMonitorRunTimeNanos; + this.totalGcTimeNanos = totalGcTimeNanos; + this.gcTimePercentage = inGcTimePercentage; + } + + @Override + public synchronized GcData clone() { + try { + return (GcData) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } + } + + private static class TsAndData { + private long tsNanos; // Timestamp when this measurement was taken + private long gcPauseNanos; // Total GC pause time within the interval between ts + // and the timestamp of the previous measurement. + + void setValues(long tsNanos, long gcPauseNanos) { + this.tsNanos = tsNanos; + this.gcPauseNanos = gcPauseNanos; + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + * This main function just leaks memory. Running this class will quickly + * result in a "GC hell" and subsequent alerts from the GcTimeMonitor. + */ + public static void main(String []args) throws Exception { + new GcTimeMonitor(20 * 1000, 500, 20, + new GcTimeMonitor.GcTimeAlertHandler() { + @Override + public void alert(GcData gcData) { + System.err.println( + "GcTimeMonitor alert. Current GC time percentage = " + + gcData.getGcTimePercentage() + + ", total run time = " + (gcData.getGcMonitorRunTimeMs() / 1000) + " sec" + + ", total GC time = " + (gcData.getAccumulatedGcTimeMs() / 1000) + " sec"); + } + }).start(); + + List list = Lists.newArrayList(); + for (int i = 0; ; i++) { + list.add("This is a long string to fill memory quickly " + i); + if (i % 100000 == 0) { + System.out.println("Added " + i + " strings"); + Thread.sleep(100); + } + } + } + +} + diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d7445c..35baec2227 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2045,13 +2045,21 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage", (float) 0.55, "This number means how much memory the local task can take to hold the key/value into an in-memory hash table \n" + "when this map join is followed by a group by. If the local task's memory usage is more than this number, \n" + - "the local task will abort by itself. It means the data of the small table is too large to be held in memory."), + "the local task will abort by itself. It means the data of the small table is too large " + + "to be held in memory. Does not apply to Hive-on-Spark (replaced by " + + "hive.mapjoin.max.gc.time.percentage)"), HIVEHASHTABLEMAXMEMORYUSAGE("hive.mapjoin.localtask.max.memory.usage", (float) 0.90, "This number means how much memory the local task can take to hold the key/value into an in-memory hash table. \n" + "If the local task's memory usage is more than this number, the local task will abort by itself. \n" + - "It means the data of the small table is too large to be held in memory."), + "It means the data of the small table is too large to be held in memory. Does not apply to " + + "Hive-on-Spark (replaced by hive.mapjoin.max.gc.time.percentage)"), HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000, "The number means after how many rows processed it needs to check the memory usage"), + HIVEHASHTABLEMAXGCTIMEPERCENTAGE("hive.mapjoin.max.gc.time.percentage", (float) 0.60, + new RangeValidator(0, 1), "This number means how much time (what percentage, " + + "0..1, of wallclock time) the JVM is allowed to spend in garbage collection when running " + + "the local task. If GC time percentage exceeds this number, the local task will abort by " + + "itself. Applies to Hive-on-Spark only"), HIVEDEBUGLOCALTASK("hive.debug.localtask",false, ""), diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml index 0c3adb4b0f..0daf9adc71 100644 --- a/data/conf/hive-site.xml +++ b/data/conf/hive-site.xml @@ -203,6 +203,16 @@ Whether Hive ignores the mapjoin hint + + hive.mapjoin.max.gc.time.percentage + 0.99 + + Maximum percentage of wallclock time that the JVM can spend in GC. + If this limit is exceeded, the local task will abort by itself. + Tests may run in very stressed environment, so this number is set very high to avoid false negatives. + + + hive.input.format org.apache.hadoop.hive.ql.io.CombineHiveInputFormat diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml index 79e388ec06..317631c16e 100644 --- a/data/conf/spark/standalone/hive-site.xml +++ b/data/conf/spark/standalone/hive-site.xml @@ -236,7 +236,12 @@ spark.driver.extraClassPath - ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar + ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar:${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar + + + + spark.executor.extraClassPath + ${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar diff --git a/pom.xml b/pom.xml index 9c15328ecc..5e100fd45b 100644 --- a/pom.xml +++ b/pom.xml @@ -1329,6 +1329,7 @@ ${test.conf.dir}/krb5.conf ${antlr.version} + ${hadoop.version} ${qfile} ${initScript} ${clustermode} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java new file mode 100644 index 0000000000..9dbf284cfc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; +import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; +import org.apache.hadoop.hive.ql.session.SessionState; + +/** + * A {@link MemoryExhaustionChecker} that uses a {@link MapJoinMemoryExhaustionHandler} to check + * memory overhead. + */ +class DefaultMemoryExhaustionChecker implements MemoryExhaustionChecker { + + private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; + + DefaultMemoryExhaustionChecker(SessionState.LogHelper console, + HashTableSinkDesc hashTableSinkDesc) { + super(); + this.memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, + hashTableSinkDesc.getHashtableMemoryUsage()); + } + + @Override + public void checkMemoryOverhead(long rowNumber, long hashTableScale, int tableContainerSize) { + if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { + this.memoryExhaustionHandler.checkMemoryStatus(tableContainerSize, rowNumber); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index f01b67e17d..36c93350d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; @@ -103,7 +102,7 @@ private long rowNumber = 0; protected transient LogHelper console; private long hashTableScale; - private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; + private MemoryExhaustionChecker memoryExhaustionChecker; /** Kryo ctor. */ protected HashTableSinkOperator() { @@ -126,7 +125,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); - memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); + memoryExhaustionChecker = MemoryExhaustionCheckerFactory.getChecker(console, hconf, conf); emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first @@ -255,9 +254,7 @@ public void process(Object row, int tag) throws HiveException { rowContainer = emptyRowContainer; } rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); - } + memoryExhaustionChecker.checkMemoryOverhead(rowNumber, hashTableScale, tableContainer.size()); tableContainer.put(key, rowContainer); } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java new file mode 100644 index 0000000000..594ac18017 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +/** + * Checks the memory overhead when running {@link HashTableSinkOperator}. Throws a + * {@link org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError} if too much memory + * is being used. + */ +interface MemoryExhaustionChecker { + + void checkMemoryOverhead(long rowNumber, long hashTableScale, int tableContainerSize); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java new file mode 100644 index 0000000000..a1840fa142 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; +import org.apache.hadoop.hive.ql.session.SessionState; + +class MemoryExhaustionCheckerFactory { + + private MemoryExhaustionCheckerFactory() { + // Do default constructor allowed + } + + static MemoryExhaustionChecker getChecker(SessionState.LogHelper console, Configuration conf, + HashTableSinkDesc hashTableSinkDesc) { + if ("spark".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + return SparkMemoryExhaustionChecker.get(conf); + } else { + return new DefaultMemoryExhaustionChecker(console, hashTableSinkDesc); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java new file mode 100644 index 0000000000..880056164a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.GcTimeMonitor; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link MemoryExhaustionChecker} specific to Hive-on-Spark. Unlike the + * {@link DefaultMemoryExhaustionChecker} it uses a {@link GcTimeMonitor} to monitor how much + * time is spent in GC. If this value exceeds the configured value in + * {@link HiveConf.ConfVars#HIVEHASHTABLEMAXGCTIMEPERCENTAGE} a + * {@link MapJoinMemoryExhaustionError} is thrown. + */ +class SparkMemoryExhaustionChecker implements MemoryExhaustionChecker { + + private static final Logger LOG = LoggerFactory.getLogger(SparkMemoryExhaustionChecker.class); + + private static SparkMemoryExhaustionChecker INSTANCE; + + // The GC time alert functionality below is used by the checkGcOverhead() method. + // This method may be called very frequently, and if it called + // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary + // overhead due to synchronization and new object creation. So instead, + // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage, + // and checkGcOverhead() may check it as frequently as needed. + private volatile int lastAlertGcTimePercentage; + private final int criticalGcTimePercentage; + + private SparkMemoryExhaustionChecker(Configuration conf) { + super(); + criticalGcTimePercentage = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVEHASHTABLEMAXGCTIMEPERCENTAGE) * 100; + GcTimeMonitor hiveGcTimeMonitor = new HiveGcTimeMonitor(criticalGcTimePercentage, LOG); + hiveGcTimeMonitor.start(); + } + + static synchronized SparkMemoryExhaustionChecker get(Configuration conf) { + if (INSTANCE == null) { + INSTANCE = new SparkMemoryExhaustionChecker(conf); + } + return INSTANCE; + } + + @Override + public void checkMemoryOverhead(long rowNumber, long hashTableScale, int tableContainerSize) { + if (lastAlertGcTimePercentage >= criticalGcTimePercentage) { + String msg = "GC time percentage = " + lastAlertGcTimePercentage + "% exceeded threshold " + + criticalGcTimePercentage + "%"; + throw new MapJoinMemoryExhaustionError(msg); + } + } + + // GC time monitoring + class HiveGcTimeMonitor extends GcTimeMonitor { + + HiveGcTimeMonitor(int criticalGcTimePercentage, Logger log) { + super(45 * 1000, 500, criticalGcTimePercentage, gcData -> { + lastAlertGcTimePercentage = gcData.getGcTimePercentage(); + log.warn("GcTimeMonitor alert called. Current GC time = " + lastAlertGcTimePercentage + + "%"); + }); + } + } +}