commit d4c0dbb83f3b10ba7b742b3c68a311efdfdf9a8c Author: Sahil Takiar Date: Mon Jul 16 10:26:21 2018 -0500 HIVE-14162: Allow disabling of long running job on Hive On Spark On YARN diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 858c63011b..891d199a51 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4291,6 +4291,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "specified (default) then the spark-submit shell script is used to launch the Spark " + "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + "InProcessLauncher is used to programmatically launch the app."), + SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.SECONDS, + 300L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + + " a Spark job to be submitted before shutting down."), + SPARK_SESSION_TIMEOUT_PERIOD("hive.spark.session.timeout.period", "30s", + new TimeValidator(TimeUnit.SECONDS, 30L, true, null, true), + "How frequently to check for idle Spark sessions"), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java new file mode 100644 index 0000000000..bb88642a43 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Paths; + +public class TestSparkSessionTimeout { + + @Test + public void testSparkSessionTimeout() throws HiveException, InterruptedException { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local-cluster[1,2,1024]"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-local-dir").toString()); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s"); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); + + SessionState.start(conf); + + Driver driver = null; + + try { + driver = new Driver(new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf).build(), + null, null); + + SparkSession sparkSession = SparkUtilities.getSparkSession(conf, SparkSessionManagerImpl + .getInstance()); + + Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode()); + Assert.assertEquals(0, driver.run("select * from test order by col").getResponseCode()); + + Thread.sleep(10000); + + Assert.assertFalse(sparkSession.isOpen()); + + Assert.assertEquals(0, driver.run("select * from test order by col").getResponseCode()); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode()); + driver.destroy(); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 78922f1e43..238fcfc3e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.Entity.Type; @@ -542,6 +543,11 @@ private void compile(String command, boolean resetTaskIds, boolean deferClose) t String queryId = queryState.getQueryId(); + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQuerySubmission(queryId); + } + if (ctx != null) { setTriggerContext(queryId); } @@ -2537,6 +2543,10 @@ private void execute() throws CommandProcessorResponse { queryState.setNumModifiedRows(numModifiedRows); console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); } + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQueryCompletion(queryId); + } lDrvState.stateLock.lock(); try { lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index f96a8f77ce..a0291a76a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -28,6 +28,7 @@ import java.io.IOException; public interface SparkSession { + /** * Initializes a Spark session for DAG execution. * @param conf Hive configuration. @@ -75,4 +76,20 @@ * Get an HDFS dir specific to the SparkSession * */ Path getHDFSSessionDir() throws IOException; + + /** + * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a + * query has completed. + * + * @param queryId the id of the query that completed + */ + void onQueryCompletion(String queryId); + + /** + * Callback function that is invoked by the {@link org.apache.hadoop.hive.ql.Driver} when a + * query has been submitted. + * + * @param queryId the id of the query that completed + */ + void onQuerySubmission(String queryId); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 0f2f0315a3..d6196f6894 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -19,8 +19,15 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -28,6 +35,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -49,6 +57,7 @@ import com.google.common.base.Preconditions; public class SparkSessionImpl implements SparkSession { + private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class); private static final String SPARK_DIR = "_spark_session_dir"; @@ -65,12 +74,17 @@ /** Pre-compiled error patterns. Shared between all Spark sessions */ private static Map errorPatterns; - private HiveConf conf; - private boolean isOpen; + private volatile HiveConf conf; + private volatile boolean isOpen; private final String sessionId; - private HiveSparkClient hiveSparkClient; - private Path scratchDir; + private volatile HiveSparkClient hiveSparkClient; + private volatile Path scratchDir; private final Object dirLock = new Object(); + private volatile long lastSparkJobCompletionTime; + private final Set activeJobs = Sets.newConcurrentHashSet(); + private volatile Future timeoutFuture; + private volatile boolean queryCompleted; + private ReadWriteLock closeLock = new ReentrantReadWriteLock(); public SparkSessionImpl() { sessionId = makeSessionId(); @@ -79,66 +93,111 @@ public SparkSessionImpl() { @Override public void open(HiveConf conf) throws HiveException { - LOG.info("Trying to open Hive on Spark session {}", sessionId); - this.conf = conf; - isOpen = true; + closeLock.readLock().lock(); try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); - } catch (Throwable e) { - // It's possible that user session is closed while creating Spark client. - HiveException he; - if (isOpen) { - he = getHiveException(e); - } else { - he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + LOG.info("Trying to open Hive on Spark session {}", sessionId); + this.conf = conf; + isOpen = true; + try { + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); + } catch (Throwable e) { + // It's possible that user session is closed while creating Spark client. + HiveException he; + if (isOpen) { + he = getHiveException(e); + } else { + he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + } + throw he; } - throw he; + startTimeoutThread(); + LOG.info("Hive on Spark session {} successfully opened", sessionId); + } finally { + closeLock.readLock().unlock(); } - LOG.info("Hive on Spark session {} successfully opened", sessionId); + } + + private void startTimeoutThread() { + long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, TimeUnit + .MILLISECONDS); + long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, + TimeUnit.MILLISECONDS); + ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); + timeoutFuture = es.scheduleAtFixedRate(() -> { + if (queryCompleted && activeJobs.isEmpty() && lastSparkJobCompletionTime > 0 + && (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout) { + closeLock.writeLock().lock(); + try { + if (queryCompleted && activeJobs.isEmpty() && lastSparkJobCompletionTime > 0 + && (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout) { + LOG.warn("Closing Spark session because a Spark job has not been run in" + + " the past " + sessionTimeout / 1000 + " seconds"); + close(); + } + } finally { + closeLock.writeLock().unlock(); + } + } + }, 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); } @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { - Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); - return hiveSparkClient.execute(driverContext, sparkWork); + closeLock.readLock().lock(); + try { + Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); + return hiveSparkClient.execute(driverContext, sparkWork); + } finally { + closeLock.readLock().unlock(); + } } @Override public ObjectPair getMemoryAndCores() throws Exception { - SparkConf sparkConf = hiveSparkClient.getSparkConf(); - int numExecutors = hiveSparkClient.getExecutorCount(); - // at start-up, we may be unable to get number of executors - if (numExecutors <= 0) { - return new ObjectPair(-1L, -1); - } - int executorMemoryInMB = Utils.memoryStringToMb( - sparkConf.get("spark.executor.memory", "512m")); - double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); - long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); - int totalCores; - String masterURL = sparkConf.get("spark.master"); - if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { - totalCores = sparkConf.contains("spark.default.parallelism") ? - sparkConf.getInt("spark.default.parallelism", 1) : - hiveSparkClient.getDefaultParallelism(); - totalCores = Math.max(totalCores, numExecutors); - } else { - int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); - totalCores = numExecutors * coresPerExecutor; - } - totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); + closeLock.readLock().lock(); + try { + SparkConf sparkConf = hiveSparkClient.getSparkConf(); + int numExecutors = hiveSparkClient.getExecutorCount(); + // at start-up, we may be unable to get number of executors + if (numExecutors <= 0) { + return new ObjectPair(-1L, -1); + } + int executorMemoryInMB = Utils.memoryStringToMb( + sparkConf.get("spark.executor.memory", "512m")); + double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); + long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); + int totalCores; + String masterURL = sparkConf.get("spark.master"); + if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { + totalCores = sparkConf.contains("spark.default.parallelism") ? + sparkConf.getInt("spark.default.parallelism", 1) : + hiveSparkClient.getDefaultParallelism(); + totalCores = Math.max(totalCores, numExecutors); + } else { + int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); + totalCores = numExecutors * coresPerExecutor; + } + totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); - long memoryPerTaskInBytes = totalMemory / totalCores; - LOG.info("Hive on Spark application currently has number of executors: " + numExecutors - + ", total cores: " + totalCores + ", memory per executor: " - + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); - return new ObjectPair(Long.valueOf(memoryPerTaskInBytes), - Integer.valueOf(totalCores)); + long memoryPerTaskInBytes = totalMemory / totalCores; + LOG.info("Hive on Spark application currently has number of executors: " + numExecutors + + ", total cores: " + totalCores + ", memory per executor: " + + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); + return new ObjectPair(Long.valueOf(memoryPerTaskInBytes), + Integer.valueOf(totalCores)); + } finally { + closeLock.readLock().unlock(); + } } @Override public boolean isOpen() { - return isOpen; + closeLock.readLock().lock(); + try { + return isOpen; + } finally { + closeLock.readLock().unlock(); + } } @Override @@ -153,18 +212,28 @@ public String getSessionId() { @Override public void close() { - LOG.info("Trying to close Hive on Spark session {}", sessionId); - isOpen = false; - if (hiveSparkClient != null) { + if (isOpen) { + closeLock.writeLock().lock(); try { - hiveSparkClient.close(); - LOG.info("Hive on Spark session {} successfully closed", sessionId); - cleanScratchDir(); - } catch (IOException e) { - LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); + LOG.info("Trying to close Hive on Spark session {}", sessionId); + isOpen = false; + if (hiveSparkClient != null) { + try { + hiveSparkClient.close(); + LOG.info("Hive on Spark session {} successfully closed", sessionId); + cleanScratchDir(); + } catch (IOException e) { + LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); + } + } + hiveSparkClient = null; + queryCompleted = false; + lastSparkJobCompletionTime = 0; + timeoutFuture.cancel(false); + } finally { + closeLock.writeLock().unlock(); } } - hiveSparkClient = null; } private Path createScratchDir() throws IOException { @@ -260,6 +329,18 @@ public Path getHDFSSessionDir() throws IOException { return scratchDir; } + @Override + public void onQuerySubmission(String queryId) { + activeJobs.add(queryId); + } + + @Override + public void onQueryCompletion(String queryId) { + if (!queryCompleted) queryCompleted = true; + activeJobs.remove(queryId); + lastSparkJobCompletionTime = System.currentTimeMillis(); + } + public static String makeSessionId() { return UUID.randomUUID().toString(); }