commit 8f3fea474e97e4e96eb5e44aaa8256b2267101b6 Author: Sahil Takiar Date: Mon Jul 16 10:26:21 2018 -0500 HIVE-14162: Allow disabling of long running job on Hive On Spark On YARN diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 15217e7b95..199556e1b5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4297,6 +4297,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "specified (default) then the spark-submit shell script is used to launch the Spark " + "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + "InProcessLauncher is used to programmatically launch the app."), + SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.SECONDS, + 300L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + + " a Spark job to be submitted before shutting down."), + SPARK_SESSION_TIMEOUT_PERIOD("hive.spark.session.timeout.period", "30s", + new TimeValidator(TimeUnit.SECONDS, 30L, true, null, true), + "How frequently to check for idle Spark sessions"), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java new file mode 100644 index 0000000000..eeab5f0c08 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.net.MalformedURLException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class TestSparkSessionTimeout { + + @Test + public void testSparkSessionTimeout() throws HiveException, InterruptedException, MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + HiveConf conf = new HiveConf(); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString()); + + SessionState.start(conf); + + runTestSparkSessionTimeout(conf); + } + + @Test + public void testMultiSessionSparkSessionTimeout() throws InterruptedException, + ExecutionException { + List> futures = new ArrayList<>(); + ExecutorService es = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + futures.add(es.submit(() -> { + String confDir = "../../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + HiveConf conf = new HiveConf(); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); + + SessionState.start(conf); + + runTestSparkSessionTimeout(conf); + return null; + })); + } + for (Future future : futures) { + future.get(); + } + } + + @Test + public void testMultiSparkSessionTimeout() throws ExecutionException, InterruptedException { + List> futures = new ArrayList<>(); + ExecutorService es = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + futures.add(es.submit(() -> { + String confDir = "../../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + HiveConf conf = new HiveConf(); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); + + SessionState.start(conf); + + runTestSparkSessionTimeout(conf); + return null; + })); + } + for (Future future : futures) { + future.get(); + } + } + + private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException, + InterruptedException { + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s"); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); + + String tableName = "test" + UUID.randomUUID().toString().replace("-", ""); + + Driver driver = null; + + try { + driver = new Driver(new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf).build(), + null, null); + + SparkSession sparkSession = SparkUtilities.getSparkSession(conf, SparkSessionManagerImpl + .getInstance()); + + Assert.assertEquals(0, + driver.run("create table " + tableName + " (col int)").getResponseCode()); + Assert.assertEquals(0, + driver.run("select * from " + tableName + " order by col").getResponseCode()); + + Thread.sleep(10000); + + Assert.assertFalse(sparkSession.isOpen()); + + Assert.assertEquals(0, + driver.run("select * from " + tableName + " order by col").getResponseCode()); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists " + tableName).getResponseCode()); + driver.destroy(); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 96d7300273..7708a95489 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.Entity.Type; @@ -542,6 +543,11 @@ private void compile(String command, boolean resetTaskIds, boolean deferClose) t String queryId = queryState.getQueryId(); + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQuerySubmission(queryId); + } + if (ctx != null) { setTriggerContext(queryId); } @@ -2547,6 +2553,10 @@ private void execute() throws CommandProcessorResponse { queryState.setNumModifiedRows(numModifiedRows); console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); } + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQueryCompletion(queryId); + } lDrvState.stateLock.lock(); try { lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index f96a8f77ce..4e1228a8a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -26,8 +26,11 @@ import org.apache.hadoop.hive.ql.plan.SparkWork; import java.io.IOException; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; public interface SparkSession { + /** * Initializes a Spark session for DAG execution. * @param conf Hive configuration. @@ -75,4 +78,30 @@ * Get an HDFS dir specific to the SparkSession * */ Path getHDFSSessionDir() throws IOException; + + /** + * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a + * query has completed. + * + * @param queryId the id of the query that completed + */ + void onQueryCompletion(String queryId); + + /** + * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a + * query has been submitted. + * + * @param queryId the id of the query that completed + */ + void onQuerySubmission(String queryId); + + /** + * Checks if a session has timed out, and closes if the session if the timeout has occurred; + * returns true if the session timed out, and false otherwise + * + * @param sessionTimeout the session timeout + * + * @return true if the session timed out and was closed, false otherwise + */ + boolean triggerTimeout(long sessionTimeout); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 0f2f0315a3..64f1c1ddb8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -19,8 +19,15 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -28,6 +35,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -49,6 +57,7 @@ import com.google.common.base.Preconditions; public class SparkSessionImpl implements SparkSession { + private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class); private static final String SPARK_DIR = "_spark_session_dir"; @@ -65,12 +74,16 @@ /** Pre-compiled error patterns. Shared between all Spark sessions */ private static Map errorPatterns; - private HiveConf conf; - private boolean isOpen; + private volatile HiveConf conf; + private volatile boolean isOpen; private final String sessionId; - private HiveSparkClient hiveSparkClient; - private Path scratchDir; + private volatile HiveSparkClient hiveSparkClient; + private volatile Path scratchDir; private final Object dirLock = new Object(); + private volatile long lastSparkJobCompletionTime; + private final Set activeJobs = Sets.newConcurrentHashSet(); + private volatile boolean queryCompleted; + private ReadWriteLock closeLock = new ReentrantReadWriteLock(); public SparkSessionImpl() { sessionId = makeSessionId(); @@ -79,66 +92,86 @@ public SparkSessionImpl() { @Override public void open(HiveConf conf) throws HiveException { - LOG.info("Trying to open Hive on Spark session {}", sessionId); - this.conf = conf; - isOpen = true; + closeLock.readLock().lock(); try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); - } catch (Throwable e) { - // It's possible that user session is closed while creating Spark client. - HiveException he; - if (isOpen) { - he = getHiveException(e); - } else { - he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + LOG.info("Trying to open Hive on Spark session {}", sessionId); + this.conf = conf; + isOpen = true; + try { + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); + } catch (Throwable e) { + // It's possible that user session is closed while creating Spark client. + HiveException he; + if (isOpen) { + he = getHiveException(e); + } else { + he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + } + throw he; } - throw he; + LOG.info("Hive on Spark session {} successfully opened", sessionId); + } finally { + closeLock.readLock().unlock(); } - LOG.info("Hive on Spark session {} successfully opened", sessionId); } @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { - Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); - return hiveSparkClient.execute(driverContext, sparkWork); + closeLock.readLock().lock(); + try { + Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); + return hiveSparkClient.execute(driverContext, sparkWork); + } finally { + closeLock.readLock().unlock(); + } } @Override public ObjectPair getMemoryAndCores() throws Exception { - SparkConf sparkConf = hiveSparkClient.getSparkConf(); - int numExecutors = hiveSparkClient.getExecutorCount(); - // at start-up, we may be unable to get number of executors - if (numExecutors <= 0) { - return new ObjectPair(-1L, -1); - } - int executorMemoryInMB = Utils.memoryStringToMb( - sparkConf.get("spark.executor.memory", "512m")); - double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); - long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); - int totalCores; - String masterURL = sparkConf.get("spark.master"); - if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { - totalCores = sparkConf.contains("spark.default.parallelism") ? - sparkConf.getInt("spark.default.parallelism", 1) : - hiveSparkClient.getDefaultParallelism(); - totalCores = Math.max(totalCores, numExecutors); - } else { - int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); - totalCores = numExecutors * coresPerExecutor; - } - totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); + closeLock.readLock().lock(); + try { + SparkConf sparkConf = hiveSparkClient.getSparkConf(); + int numExecutors = hiveSparkClient.getExecutorCount(); + // at start-up, we may be unable to get number of executors + if (numExecutors <= 0) { + return new ObjectPair(-1L, -1); + } + int executorMemoryInMB = Utils.memoryStringToMb( + sparkConf.get("spark.executor.memory", "512m")); + double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); + long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); + int totalCores; + String masterURL = sparkConf.get("spark.master"); + if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { + totalCores = sparkConf.contains("spark.default.parallelism") ? + sparkConf.getInt("spark.default.parallelism", 1) : + hiveSparkClient.getDefaultParallelism(); + totalCores = Math.max(totalCores, numExecutors); + } else { + int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); + totalCores = numExecutors * coresPerExecutor; + } + totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); - long memoryPerTaskInBytes = totalMemory / totalCores; - LOG.info("Hive on Spark application currently has number of executors: " + numExecutors - + ", total cores: " + totalCores + ", memory per executor: " - + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); - return new ObjectPair(Long.valueOf(memoryPerTaskInBytes), - Integer.valueOf(totalCores)); + long memoryPerTaskInBytes = totalMemory / totalCores; + LOG.info("Hive on Spark application currently has number of executors: " + numExecutors + + ", total cores: " + totalCores + ", memory per executor: " + + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); + return new ObjectPair(Long.valueOf(memoryPerTaskInBytes), + Integer.valueOf(totalCores)); + } finally { + closeLock.readLock().unlock(); + } } @Override public boolean isOpen() { - return isOpen; + closeLock.readLock().lock(); + try { + return isOpen; + } finally { + closeLock.readLock().unlock(); + } } @Override @@ -153,18 +186,29 @@ public String getSessionId() { @Override public void close() { - LOG.info("Trying to close Hive on Spark session {}", sessionId); - isOpen = false; - if (hiveSparkClient != null) { - try { - hiveSparkClient.close(); - LOG.info("Hive on Spark session {} successfully closed", sessionId); - cleanScratchDir(); - } catch (IOException e) { - LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); + if (isOpen) { + closeLock.writeLock().lock(); + if (isOpen) { + try { + LOG.info("Trying to close Hive on Spark session {}", sessionId); + isOpen = false; + if (hiveSparkClient != null) { + try { + hiveSparkClient.close(); + LOG.info("Hive on Spark session {} successfully closed", sessionId); + cleanScratchDir(); + } catch (IOException e) { + LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); + } + } + hiveSparkClient = null; + queryCompleted = false; + lastSparkJobCompletionTime = 0; + } finally { + closeLock.writeLock().unlock(); + } } } - hiveSparkClient = null; } private Path createScratchDir() throws IOException { @@ -260,6 +304,38 @@ public Path getHDFSSessionDir() throws IOException { return scratchDir; } + @Override + public void onQuerySubmission(String queryId) { + activeJobs.add(queryId); + } + + @Override + public boolean triggerTimeout(long sessionTimeout) { + if (queryCompleted && activeJobs.isEmpty() && lastSparkJobCompletionTime > 0 && + (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout) { + closeLock.writeLock().lock(); + try { + if (queryCompleted && activeJobs.isEmpty() && lastSparkJobCompletionTime > 0 && + (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout) { + LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " + + "been run in the past " + sessionTimeout / 1000 + " seconds"); + close(); + return true; + } + } finally { + closeLock.writeLock().unlock(); + } + } + return false; + } + + @Override + public void onQueryCompletion(String queryId) { + if (!queryCompleted) queryCompleted = true; + activeJobs.remove(queryId); + lastSparkJobCompletionTime = System.currentTimeMillis(); + } + public static String makeSessionId() { return UUID.randomUUID().toString(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 46cee0d903..80e00668c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -23,7 +23,12 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +46,10 @@ public class SparkSessionManagerImpl implements SparkSessionManager { private static final Logger LOG = LoggerFactory.getLogger(SparkSessionManagerImpl.class); - private Set createdSessions = Collections.synchronizedSet(new HashSet()); + private final Set createdSessions = Sets.newConcurrentHashSet(); + private volatile Future timeoutFuture; private volatile boolean inited = false; + private volatile HiveConf conf; private static SparkSessionManagerImpl instance; @@ -79,6 +86,8 @@ public void setup(HiveConf hiveConf) throws HiveException { synchronized (this) { if (!inited) { LOG.info("Setting up the session manager."); + conf = hiveConf; + startTimeoutThread(); Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); try { SparkClientFactory.initialize(conf); @@ -135,7 +144,7 @@ public void closeSession(SparkSession sparkSession) throws HiveException { } if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Closing session (%s).", sparkSession.getSessionId())); + LOG.debug(String.format("Closing Spark session (%s).", sparkSession.getSessionId())); } sparkSession.close(); createdSessions.remove(sparkSession); @@ -144,15 +153,22 @@ public void closeSession(SparkSession sparkSession) throws HiveException { @Override public void shutdown() { LOG.info("Closing the session manager."); - synchronized (createdSessions) { - Iterator it = createdSessions.iterator(); - while (it.hasNext()) { - SparkSession session = it.next(); - session.close(); - } - createdSessions.clear(); - } + timeoutFuture.cancel(false); + createdSessions.forEach(SparkSession::close); + createdSessions.clear(); inited = false; SparkClientFactory.stop(); } + + private void startTimeoutThread() { + long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, + TimeUnit.MILLISECONDS); + ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); + timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() + .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) + .forEach(createdSessions::remove), + 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + } }