diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml
index 79e388ec06..317631c16e 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -236,7 +236,12 @@
spark.driver.extraClassPath
- ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar
+ ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar:${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar
+
+
+
+ spark.executor.extraClassPath
+ ${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar
diff --git a/pom.xml b/pom.xml
index 7503cff532..7b588fbef9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1327,6 +1327,7 @@
${test.conf.dir}/krb5.conf
${antlr.version}
+ ${hadoop.version}
${qfile}
${initScript}
${clustermode}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index f01b67e17d..73592b9edb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
@@ -103,7 +102,6 @@
private long rowNumber = 0;
protected transient LogHelper console;
private long hashTableScale;
- private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
/** Kryo ctor. */
protected HashTableSinkOperator() {
@@ -126,7 +124,6 @@ protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
console = new LogHelper(LOG, isSilent);
- memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage());
emptyRowContainer.addRow(emptyObjectArray);
// for small tables only; so get the big table position first
@@ -230,6 +227,8 @@ protected void initializeOp(Configuration hconf) throws HiveException {
*/
@Override
public void process(Object row, int tag) throws HiveException {
+ checkGcOverhead();
+
byte alias = (byte)tag;
// compute keys and values as StandardObjects. Use non-optimized key (MR).
Object[] currentKey = new Object[joinKeys[alias].size()];
@@ -255,9 +254,6 @@ public void process(Object row, int tag) throws HiveException {
rowContainer = emptyRowContainer;
}
rowNumber++;
- if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
- memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
- }
tableContainer.put(key, rowContainer);
} else if (rowContainer == emptyRowContainer) {
rowContainer = rowContainer.copy();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 38316bf7fa..45386a906a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -36,8 +36,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -58,6 +60,7 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.GcTimeMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,6 +129,35 @@
private transient boolean multiChildren;
private transient int[] selected;
+ // GC time monitoring
+ static class HiveGcTimeMonitor extends GcTimeMonitor {
+ private final Logger log;
+
+ HiveGcTimeMonitor(int criticalGcTimePercentage, Logger log) {
+ super(45 * 1000, 500, criticalGcTimePercentage,
+ new GcTimeMonitor.GcTimeAlertHandler() {
+ @Override
+ public void alert(GcTimeMonitor.GcData gcData) {
+ lastAlertGcTimePercentage = gcData.getGcTimePercentage();
+ log.warn("GcTimeMonitor alert called. Current GC time % = " +
+ lastAlertGcTimePercentage);
+ }
+ });
+ this.log = log;
+ }
+ }
+
+ // The GC time alert functionality below is used by the checkGcOverhead() method.
+ // This method may be called very frequently, and if it called
+ // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary
+ // overhead due to synchronization and new object creation. So instead,
+ // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage,
+ // and checkGcOverhead() may check it as frequently as needed.
+ private static final int CRITICAl_GC_TIME_PERCENTAGE_PROD = 50; // In prod, 50% in GC is a lot
+ private static final int CRITICAL_GC_TIME_PERCENTAGE_TEST = 99; // Don't fail in tests
+ protected static volatile int lastAlertGcTimePercentage, criticalGcTimePercentage;
+ private static volatile GcTimeMonitor hiveGcTimeMonitor;
+
// dummy operator (for not increasing seqId)
protected Operator(String name, CompilationOpContext cContext) {
this();
@@ -395,6 +427,14 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs)
LOG.debug("Initialization Done " + id + " " + getName() + " done is reset.");
}
+ if (hiveGcTimeMonitor == null) {
+ boolean inTest = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_IN_TEST);
+ criticalGcTimePercentage = inTest ?
+ CRITICAL_GC_TIME_PERCENTAGE_TEST : CRITICAl_GC_TIME_PERCENTAGE_PROD;
+ hiveGcTimeMonitor = new HiveGcTimeMonitor(criticalGcTimePercentage, LOG);
+ hiveGcTimeMonitor.start();
+ }
+
// let's wait on the async ops before continuing
completeInitialization(asyncInitOperations);
}
@@ -1619,4 +1659,12 @@ public void setBucketingVersion(int bucketingVersion) {
public int getBucketingVersion() {
return bucketingVersion;
}
+
+ public static void checkGcOverhead() {
+ if (lastAlertGcTimePercentage >= criticalGcTimePercentage) {
+ String msg = "GC time percentage = " + lastAlertGcTimePercentage +
+ ", exceeded threshold.";
+ throw new MapJoinMemoryExhaustionError(msg);
+ }
+ }
}