diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7ce29e1..565b63f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3079,6 +3079,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + SPARK_SHUTDOWN_JOB_DRIVER("hive.spark.shutdown.job.driver", "false", + "Whether to shutdown the job driver to release resources after the spark job is down.\n," + + "If set to true, the following jobs cannot reuse the resources and may take longer time to submit.\n" + + " Thus, only set to true if the jobs are submitted infrequently"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index a705dfc..751ceb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -173,7 +173,7 @@ public int getDefaultParallelism() throws Exception { @Override public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { - if (hiveConf.get("spark.master").startsWith("yarn-") && !remoteClient.isActive()) { + if (!remoteClient.isActive()) { // Re-create the remote client if not active any more close(); createRemoteClient(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 6597a51..cd8e239 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -113,6 +113,9 @@ public int execute(DriverContext driverContext) { // it's difficult to do it on hive side alone. See HIVE-12650. jobRef.cancelJob(); } + if (conf.getBoolVar(HiveConf.ConfVars.SPARK_SHUTDOWN_JOB_DRIVER)) { + jobRef.stop(); + } sparkJobStatus.cleanup(); } catch (Exception e) { String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java index fcf5368..9b51803 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java @@ -26,4 +26,6 @@ public boolean cancelJob(); public int monitorJob(); + + public void stop(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java index ce4d932..309de9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java @@ -64,4 +64,9 @@ public int monitorJob() { LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus); return localSparkJobMonitor.startMonitor(); } + + @Override + public void stop() { + javaSparkContext.stop(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java index 4c0993c..8a39cb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java @@ -59,4 +59,9 @@ public int monitorJob() { RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus); return remoteSparkJobMonitor.startMonitor(); } + + @Override + public void stop() { + jobHandler.stop(); + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index c02c403..b118bf3 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -60,6 +60,8 @@ */ State getState(); + void stop(); + /** * The current state of the submitted job. */ diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 7645702..96aa4c5 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -236,4 +236,8 @@ protected void finalize() { } } + @Override + public void stop() { + client.stop(); + } }