From 468f6ff9725bf564c8690d531d93ef6367d7b365 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 13 Jun 2018 16:32:34 +0800 Subject: [PATCH] Spark AM shut down when timeout for no dag submit --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 ++ .../org/apache/hive/spark/client/RemoteDriver.java | 53 ++++++++++++++++++++-- .../hive/spark/client/rpc/RpcConfiguration.java | 3 +- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a45b9cc9a..cb19e11515 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4233,6 +4233,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."), SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", "Name of the SASL mechanism to use for authentication."), + SPARK_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS("hive.spark.session.am.dag.submit.timeout.secs", 300 ,"Int value. Time " + + "(in seconds) for which the Spark AM should wait for a DAG to be submitted before shutting down." + + "Only relevant in session mode. Value 0 will disable this check and allow the AM to hang around forever in idle mode."), SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "", "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." + diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 8130860f2b..f9c2d6b864 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -25,10 +25,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@ -41,11 +38,13 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.rpc.Rpc; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkConf; +import org.apache.spark.SparkException; import org.apache.spark.SparkJobInfo; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; @@ -170,6 +169,13 @@ public String toString() { try { JavaSparkContext sc = new JavaSparkContext(conf); + + String sessionTimeoutInterval = conf.get(HiveConf.ConfVars.SPARK_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS.varname); + if (sessionTimeoutInterval == null || Integer.valueOf(sessionTimeoutInterval) == 0) { + sc.sc().addSparkListener(new ClientListener()); + } else { + sc.sc().addSparkListener(new ClientListener(this, 1000*Integer.valueOf(sessionTimeoutInterval))); + } sc.sc().addSparkListener(new ClientListener()); synchronized (jcLock) { jc = new JobContextImpl(sc, localTmpDir); @@ -487,6 +493,44 @@ private void monitorJob(JavaFutureAction job, private class ClientListener extends SparkListener { private final Map stageToJobId = Maps.newHashMap(); + private long sessionTimeoutInterval; + private long lastDAGCompletionTime = System.currentTimeMillis(); + private RemoteDriver driver; + + ClientListener() {} + + ClientListener(RemoteDriver driver, long sessionTimeoutInterval) { + this.sessionTimeoutInterval = sessionTimeoutInterval; + this.driver = driver; + new Timer("DAGSubmissionTimer", true).scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + if (stageToJobId.size() == 0) { + checkAndHandleSessionTimeout(); + } else { + LOG.debug("Some job stages are still running, no need to check session timeout(stages size:" + stageToJobId.size() + ")"); + } + } catch (SparkException e) { + LOG.error("Error when checking AM session timeout", e); + } + } + }, sessionTimeoutInterval, sessionTimeoutInterval / 10); + } + + private synchronized void checkAndHandleSessionTimeout() throws SparkException { + + long currentTime = System.currentTimeMillis(); + if (currentTime < (lastDAGCompletionTime + sessionTimeoutInterval)) { + return; + } + String message = "Session timed out" + + ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms" + + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms"; + LOG.info(message); + if (driver != null) + driver.shutdown(null); + } @Override public void onJobStart(SparkListenerJobStart jobStart) { @@ -513,6 +557,7 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) { if (clientId != null) { activeJobs.get(clientId).jobDone(); } + lastDAGCompletionTime = System.currentTimeMillis(); } @Override diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index bd3a7a7321..b826cca40e 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -53,7 +53,8 @@ HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, - HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, + HiveConf.ConfVars.SPARK_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS.varname ); public static final ImmutableSet HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, -- 2.14.2