diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml index 79e388ec06..317631c16e 100644 --- a/data/conf/spark/standalone/hive-site.xml +++ b/data/conf/spark/standalone/hive-site.xml @@ -236,7 +236,12 @@ spark.driver.extraClassPath - ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar + ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar:${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar + + + + spark.executor.extraClassPath + ${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar diff --git a/pom.xml b/pom.xml index 7503cff532..7b588fbef9 100644 --- a/pom.xml +++ b/pom.xml @@ -1327,6 +1327,7 @@ ${test.conf.dir}/krb5.conf ${antlr.version} + ${hadoop.version} ${qfile} ${initScript} ${clustermode} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index f01b67e17d..73592b9edb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; @@ -103,7 +102,6 @@ private long rowNumber = 0; protected transient LogHelper console; private long hashTableScale; - private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; /** Kryo ctor. */ protected HashTableSinkOperator() { @@ -126,7 +124,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); - memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first @@ -230,6 +227,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { */ @Override public void process(Object row, int tag) throws HiveException { + checkGcOverhead(); + byte alias = (byte)tag; // compute keys and values as StandardObjects. Use non-optimized key (MR). Object[] currentKey = new Object[joinKeys[alias].size()]; @@ -255,9 +254,6 @@ public void process(Object row, int tag) throws HiveException { rowContainer = emptyRowContainer; } rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); - } tableContainer.put(key, rowContainer); } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 38316bf7fa..45386a906a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -36,8 +36,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; @@ -58,6 +60,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.GcTimeMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +129,35 @@ private transient boolean multiChildren; private transient int[] selected; + // GC time monitoring + static class HiveGcTimeMonitor extends GcTimeMonitor { + private final Logger log; + + HiveGcTimeMonitor(int criticalGcTimePercentage, Logger log) { + super(45 * 1000, 500, criticalGcTimePercentage, + new GcTimeMonitor.GcTimeAlertHandler() { + @Override + public void alert(GcTimeMonitor.GcData gcData) { + lastAlertGcTimePercentage = gcData.getGcTimePercentage(); + log.warn("GcTimeMonitor alert called. Current GC time % = " + + lastAlertGcTimePercentage); + } + }); + this.log = log; + } + } + + // The GC time alert functionality below is used by the checkGcOverhead() method. + // This method may be called very frequently, and if it called + // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary + // overhead due to synchronization and new object creation. So instead, + // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage, + // and checkGcOverhead() may check it as frequently as needed. + private static final int CRITICAl_GC_TIME_PERCENTAGE_PROD = 50; // In prod, 50% in GC is a lot + private static final int CRITICAL_GC_TIME_PERCENTAGE_TEST = 99; // Don't fail in tests + protected static volatile int lastAlertGcTimePercentage, criticalGcTimePercentage; + private static volatile GcTimeMonitor hiveGcTimeMonitor; + // dummy operator (for not increasing seqId) protected Operator(String name, CompilationOpContext cContext) { this(); @@ -395,6 +427,14 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) LOG.debug("Initialization Done " + id + " " + getName() + " done is reset."); } + if (hiveGcTimeMonitor == null) { + boolean inTest = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_IN_TEST); + criticalGcTimePercentage = inTest ? + CRITICAL_GC_TIME_PERCENTAGE_TEST : CRITICAl_GC_TIME_PERCENTAGE_PROD; + hiveGcTimeMonitor = new HiveGcTimeMonitor(criticalGcTimePercentage, LOG); + hiveGcTimeMonitor.start(); + } + // let's wait on the async ops before continuing completeInitialization(asyncInitOperations); } @@ -1619,4 +1659,12 @@ public void setBucketingVersion(int bucketingVersion) { public int getBucketingVersion() { return bucketingVersion; } + + public static void checkGcOverhead() { + if (lastAlertGcTimePercentage >= criticalGcTimePercentage) { + String msg = "GC time percentage = " + lastAlertGcTimePercentage + + ", exceeded threshold."; + throw new MapJoinMemoryExhaustionError(msg); + } + } }