commit 042f38bae9013473f8b8e05bc05c2e01bc03bb1b Author: Sahil Takiar Date: Mon Jul 16 10:26:21 2018 -0500 Spark session timeout diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 858c63011b..9bc79c370b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4291,6 +4291,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "specified (default) then the spark-submit shell script is used to launch the Spark " + "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + "InProcessLauncher is used to programmatically launch the app."), + SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.SECONDS, + 300L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + + " a Spark job to be submitted before shutting down."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java new file mode 100644 index 0000000000..71461a8cd4 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Paths;; + +public class TestSparkSessionTimeout { + + @Test + public void testSparkSessionTimeout() throws HiveException, InterruptedException { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local-cluster[1,2,1024]"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-local-dir").toString()); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "30s"); + + SessionState.start(conf); + + Driver driver = null; + + try { + driver = new Driver(new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf).build(), + null, null); + + SparkSession sparkSession = SparkUtilities.getSparkSession(conf, SparkSessionManagerImpl + .getInstance()); + + Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode()); + Assert.assertEquals(0, driver.run("select * from test order by col").getResponseCode()); + + Thread.sleep(60000); + + Assert.assertFalse(sparkSession.isOpen()); + + Assert.assertEquals(0, driver.run("select * from test order by col").getResponseCode()); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode()); + driver.destroy(); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index ad5049a3e9..4d751f8b71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -216,6 +216,7 @@ public int execute(DriverContext driverContext) { LOG.error("Failed to return the session to SessionManager", ex); } } + jobRef.close(); } return rc; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 0f2f0315a3..78c58bafa8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -19,8 +19,14 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -28,6 +34,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -49,8 +56,10 @@ import com.google.common.base.Preconditions; public class SparkSessionImpl implements SparkSession { + private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class); private static final String SPARK_DIR = "_spark_session_dir"; + private static final long SESSION_TIMEOUT_SCHEDULE_PERIOD = 30000; // 30 seconds /** Regex for different Spark session error messages */ private static final String AM_TIMEOUT_ERR = ".*ApplicationMaster for attempt.*timed out.*"; @@ -65,12 +74,16 @@ /** Pre-compiled error patterns. Shared between all Spark sessions */ private static Map errorPatterns; - private HiveConf conf; - private boolean isOpen; + private volatile HiveConf conf; + private volatile boolean isOpen; private final String sessionId; - private HiveSparkClient hiveSparkClient; - private Path scratchDir; + private volatile HiveSparkClient hiveSparkClient; + private volatile Path scratchDir; private final Object dirLock = new Object(); + private volatile boolean firstSparkJobStarted; + private volatile long lastSparkJobCompletionTime; + private final Set activeJobs = Sets.newConcurrentHashSet(); + private volatile Future timeoutFuture; public SparkSessionImpl() { sessionId = makeSessionId(); @@ -94,13 +107,37 @@ public void open(HiveConf conf) throws HiveException { } throw he; } + startTimeoutThread(); LOG.info("Hive on Spark session {} successfully opened", sessionId); } + private void startTimeoutThread() { + long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, TimeUnit + .MILLISECONDS); + ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); + timeoutFuture = es.scheduleAtFixedRate(() -> { + activeJobs.forEach(jobRef -> { + if (!jobRef.isOpen() && lastSparkJobCompletionTime < jobRef.getJobCompletionTime()) { + lastSparkJobCompletionTime = jobRef.getJobCompletionTime(); + } + }); + activeJobs.removeIf(jobRef -> !jobRef.isOpen()); + if (firstSparkJobStarted && activeJobs.isEmpty() && lastSparkJobCompletionTime > 0 + && (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout) { + LOG.warn("Closing Spark session because a Spark job has not been run in" + + " the past " + sessionTimeout / 1000 + " seconds"); + close(); + } + }, 0, SESSION_TIMEOUT_SCHEDULE_PERIOD, TimeUnit.MILLISECONDS); + } + @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); - return hiveSparkClient.execute(driverContext, sparkWork); + SparkJobRef sparkJobRef = hiveSparkClient.execute(driverContext, sparkWork); + activeJobs.add(sparkJobRef); + if (!firstSparkJobStarted) firstSparkJobStarted = true; + return sparkJobRef; } @Override @@ -165,6 +202,9 @@ public void close() { } } hiveSparkClient = null; + firstSparkJobStarted = false; + lastSparkJobCompletionTime = 0; + timeoutFuture.cancel(false); } private Path createScratchDir() throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java index e112882db4..553057aff8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java @@ -19,11 +19,17 @@ public interface SparkJobRef { - public String getJobId(); + String getJobId(); - public SparkJobStatus getSparkJobStatus(); + SparkJobStatus getSparkJobStatus(); - public boolean cancelJob(); + boolean cancelJob(); - public int monitorJob(); + int monitorJob(); + + boolean isOpen(); + + void close(); + + long getJobCompletionTime(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java index 18b66c6594..6d84421178 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java @@ -29,17 +29,20 @@ private final HiveConf hiveConf; private final LocalSparkJobStatus sparkJobStatus; private final JavaSparkContext javaSparkContext; + private volatile boolean isOpen; + private volatile long jobCompletionTime; public LocalSparkJobRef( - String jobId, - HiveConf hiveConf, - LocalSparkJobStatus sparkJobStatus, - JavaSparkContext javaSparkContext) { + String jobId, + HiveConf hiveConf, + LocalSparkJobStatus sparkJobStatus, + JavaSparkContext javaSparkContext) { this.jobId = jobId; this.hiveConf = hiveConf; this.sparkJobStatus = sparkJobStatus; this.javaSparkContext = javaSparkContext; + this.isOpen = true; } @Override @@ -56,6 +59,7 @@ public SparkJobStatus getSparkJobStatus() { public boolean cancelJob() { int id = Integer.parseInt(jobId); javaSparkContext.sc().cancelJob(id); + this.isOpen = false; return true; } @@ -64,4 +68,20 @@ public int monitorJob() { LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus); return localSparkJobMonitor.startMonitor(); } + + @Override + public boolean isOpen() { + return this.isOpen; + } + + @Override + public void close() { + this.isOpen = false; + this.jobCompletionTime = System.currentTimeMillis(); + } + + @Override + public long getJobCompletionTime() { + return this.jobCompletionTime; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java index e130eb6d31..9e87eff6c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java @@ -31,12 +31,15 @@ private final HiveConf hiveConf; private final RemoteSparkJobStatus sparkJobStatus; private final JobHandle jobHandler; + private volatile boolean isOpen; + private volatile long jobCompletionTime; public RemoteSparkJobRef(HiveConf hiveConf, JobHandle jobHandler, RemoteSparkJobStatus sparkJobStatus) { this.jobHandler = jobHandler; this.jobId = jobHandler.getClientJobId(); this.hiveConf = hiveConf; this.sparkJobStatus = sparkJobStatus; + this.isOpen = true; } @Override @@ -51,7 +54,9 @@ public SparkJobStatus getSparkJobStatus() { @Override public boolean cancelJob() { - return jobHandler.cancel(true); + boolean cancelled = jobHandler.cancel(true); + close(); + return cancelled; } @Override @@ -59,4 +64,20 @@ public int monitorJob() { RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus); return remoteSparkJobMonitor.startMonitor(); } + + @Override + public boolean isOpen() { + return this.isOpen; + } + + @Override + public void close() { + this.isOpen = false; + this.jobCompletionTime = System.currentTimeMillis(); + } + + @Override + public long getJobCompletionTime() { + return this.jobCompletionTime; + } }