commit d10a21b3eb801af81b494499a8c58cea075dd29d Author: Sahil Takiar Date: Thu Sep 13 17:43:41 2018 -0700 HIVE-20519: Remove 30m min value for hive.spark.session.timeout diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d7445c..e30d377d49 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4314,9 +4314,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + "InProcessLauncher is used to programmatically launch the app."), SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.MINUTES, - 30L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + - " a Spark job to be submitted before shutting down. Minimum value is 30 minutes"), - SPARK_SESSION_TIMEOUT_PERIOD("hive.spark.session.timeout.period", "60s", + 0L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + + " a Spark job to be submitted before shutting down. The Remote Driver will always " + + "wait for at least one Hive query to complete before it times out. A value of 0 " + + "shuts down the driver immediately after one query has completed."), + SPARK_SESSION_TIMEOUT_CHECK_PERIOD("hive.spark.session.timeout.check.period", "60s", new TimeValidator(TimeUnit.SECONDS, 60L, true, null, true), "How frequently to check for idle Spark sessions. Minimum value is 60 seconds."), NWAYJOINREORDER("hive.reorder.nway.joins", true, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java index c887297bc2..45eac8b4d7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -29,7 +29,7 @@ import org.junit.Test; import java.io.File; -import java.net.MalformedURLException; +import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -42,7 +42,7 @@ public class TestSparkSessionTimeout { @Test - public void testSparkSessionTimeout() throws HiveException, InterruptedException, MalformedURLException { + public void testSparkSessionTimeout() throws HiveException, InterruptedException, IOException, ExecutionException { String confDir = "../../data/conf/spark/standalone/hive-site.xml"; HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); @@ -50,54 +50,32 @@ public void testSparkSessionTimeout() throws HiveException, InterruptedException conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString()); - SessionState.start(conf); + runTestSparkSessionTimeout(conf, "5s", 10); - runTestSparkSessionTimeout(conf); - } - - @Test - public void testMultiSessionSparkSessionTimeout() throws InterruptedException, - ExecutionException { - List> futures = new ArrayList<>(); - ExecutorService es = Executors.newFixedThreadPool(10); - for (int i = 0; i < 10; i++) { - futures.add(es.submit(() -> { - String confDir = "../../data/conf/spark/local/hive-site.xml"; - HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); - - HiveConf conf = new HiveConf(); - conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); - conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), - "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); + // testSparkSessionImmediateTimeout + confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); - SessionState.start(conf); + conf = new HiveConf(); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString()); - runTestSparkSessionTimeout(conf); - return null; - })); - } - for (Future future : futures) { - future.get(); - } - } + runTestSparkSessionTimeout(conf, "0s", 5); - @Test - public void testMultiSparkSessionTimeout() throws ExecutionException, InterruptedException { + // testMultiSessionSparkSessionTimeout List> futures = new ArrayList<>(); - ExecutorService es = Executors.newFixedThreadPool(10); - for (int i = 0; i < 10; i++) { + ExecutorService es = Executors.newFixedThreadPool(5); + for (int i = 0; i < 5; i++) { futures.add(es.submit(() -> { - String confDir = "../../data/conf/spark/local/hive-site.xml"; - HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + String localConfDir = "../../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(localConfDir).toURI().toURL()); - HiveConf conf = new HiveConf(); - conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); - conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + HiveConf multiSessionConf = new HiveConf(); + multiSessionConf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); + multiSessionConf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); - SessionState.start(conf); - - runTestSparkSessionTimeout(conf); + runTestSparkSessionTimeout(multiSessionConf, "15s", 30); return null; })); } @@ -106,10 +84,12 @@ public void testMultiSparkSessionTimeout() throws ExecutionException, Interrupte } } - private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException, - InterruptedException { - conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s"); - conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); + private void runTestSparkSessionTimeout(HiveConf conf, String timeout, + long sleepTime) throws HiveException, InterruptedException, IOException { + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, timeout); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_CHECK_PERIOD, "1s"); + + SessionState ss = SessionState.start(conf); String tableName = "test" + UUID.randomUUID().toString().replace("-", ""); @@ -129,7 +109,7 @@ private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException, Assert.assertEquals(0, driver.run("select * from " + tableName + " order by col").getResponseCode()); - Thread.sleep(10000); + Thread.sleep(sleepTime * 1000); Assert.assertFalse(sparkSession.isOpen()); @@ -140,6 +120,7 @@ private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException, Assert.assertEquals(0, driver.run("drop table if exists " + tableName).getResponseCode()); driver.destroy(); } + ss.close(); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index dad2035362..e37daa0de4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -544,12 +544,6 @@ private void compile(String command, boolean resetTaskIds, boolean deferClose) t LockedDriverState.setLockedDriverState(lDrvState); String queryId = queryState.getQueryId(); - - SparkSession ss = SessionState.get().getSparkSession(); - if (ss != null) { - ss.onQuerySubmission(queryId); - } - if (ctx != null) { setTriggerContext(queryId); } @@ -572,6 +566,11 @@ private void compile(String command, boolean resetTaskIds, boolean deferClose) t try { + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQuerySubmission(queryId); + } + // Initialize the transaction manager. This must be done before analyze is called. if (initTxnMgr != null) { queryTxnMgr = initTxnMgr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 6a8b42e926..330c957372 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -108,7 +109,7 @@ /** * A {@link Set} of currently running queries. Each job is identified by its query id. */ - private final Set activeJobs = Sets.newConcurrentHashSet(); + private final Set activeQueries = Sets.newConcurrentHashSet(); /** * True if at least a single query has been run by this session, false otherwise. @@ -222,7 +223,7 @@ public void close() { if (isOpen) { closeLock.writeLock().lock(); try { - if (isOpen) { + if (activeQueries.isEmpty() && isOpen) { LOG.info("Trying to close Hive on Spark session {}", sessionId); isOpen = false; if (hiveSparkClient != null) { @@ -237,6 +238,11 @@ public void close() { hiveSparkClient = null; queryCompleted = false; lastSparkJobCompletionTime = 0; + } else if (!isOpen) { + LOG.info("Ignoring attempt to close Spark session because it has already been closed"); + } else if (!activeQueries.isEmpty()) { + LOG.info("Ignoring attempt to close Spark session because a Spark job was submitted " + + "before the close could complete"); } } finally { closeLock.writeLock().unlock(); @@ -339,7 +345,7 @@ public Path getHDFSSessionDir() throws IOException { @Override public void onQuerySubmission(String queryId) { - activeJobs.add(queryId); + activeQueries.add(queryId); } /** @@ -347,12 +353,12 @@ public void onQuerySubmission(String queryId) { */ @Override public boolean triggerTimeout(long sessionTimeout) { - if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { + if (hasTimedOut(queryCompleted, activeQueries, lastSparkJobCompletionTime, sessionTimeout)) { closeLock.writeLock().lock(); try { - if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { - LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " + - "been run in the past " + sessionTimeout / 1000 + " seconds"); + if (hasTimedOut(queryCompleted, activeQueries, lastSparkJobCompletionTime, sessionTimeout)) { + LOG.warn("Attempting to close Spark session " + getSessionId() + " because a Spark job " + + "has not been run in the past " + sessionTimeout / 1000 + " second(s)"); close(); return true; } @@ -369,10 +375,10 @@ public boolean triggerTimeout(long sessionTimeout) { * query, (2) there can be no actively running Spark jobs, and (3) the last completed Spark job * must have been more than sessionTimeout seconds ago. */ - private static boolean hasTimedOut(boolean queryCompleted, Set activeJobs, + private static boolean hasTimedOut(boolean queryCompleted, Set activeQueries, long lastSparkJobCompletionTime, long sessionTimeout) { return queryCompleted && - activeJobs.isEmpty() && + activeQueries.isEmpty() && lastSparkJobCompletionTime > 0 && (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout; } @@ -387,12 +393,56 @@ public void onQueryCompletion(String queryId) { if (!queryCompleted) { queryCompleted = true; } - activeJobs.remove(queryId); + activeQueries.remove(queryId); lastSparkJobCompletionTime = System.currentTimeMillis(); + + long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + + if (sessionTimeout == 0 && activeQueries.isEmpty()) { + LOG.warn("Attempting to close Spark session " + getSessionId() + " because " + + HiveConf.ConfVars.SPARK_SESSION_TIMEOUT.varname + " is set to " + 0); + close(); + } } @VisibleForTesting HiveSparkClient getHiveSparkClient() { return hiveSparkClient; } + + @VisibleForTesting + void setHiveSparkClient(HiveSparkClient hiveSparkClient) { + this.hiveSparkClient = hiveSparkClient; + } + + @VisibleForTesting + void setHiveConf(HiveConf hiveConf) { + conf = hiveConf; + } + + @VisibleForTesting + boolean getIsOpen() { + return this.isOpen; + } + + @VisibleForTesting + void setIsOpen(boolean isOpen) { + this.isOpen = isOpen; + } + + @VisibleForTesting + long getLastSparkJobCompletionTime() { + return this.lastSparkJobCompletionTime; + } + + @VisibleForTesting + Set getActiveQueries() { + return this.activeQueries; + } + + @VisibleForTesting + boolean getQueryCompleted() { + return this.queryCompleted; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 8dae54d7bd..fc790ec95a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -175,16 +175,20 @@ public void shutdown() { private void startTimeoutThread() { long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, + long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_CHECK_PERIOD, TimeUnit.MILLISECONDS); ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); - // Schedules a thread that does the following: iterates through all the active SparkSessions - // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then - // the SparkSession is removed from the set of active sessions managed by this class. - timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() - .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) - .forEach(createdSessions::remove), - 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + // Only schedule the thread if the session timeout is greater than 0. If the timeout is equal + // to 0 then the session will be timed out by SparkSession itself as soon as a query completes. + if (sessionTimeout > 0) { + // Schedules a thread that does the following: iterates through all the active SparkSessions + // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then + // the SparkSession is removed from the set of active sessions managed by this class. + timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() + .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) + .forEach(createdSessions::remove), + sessionTimeout, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionImplTimeout.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionImplTimeout.java new file mode 100644 index 0000000000..3a76409091 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionImplTimeout.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.apache.spark.SparkConf; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class TestSparkSessionImplTimeout { + + private static final Logger LOG = LoggerFactory.getLogger(SparkSessionManagerImpl.class); + + /** + * Tests that all variables tracked for the timeout logic are properly updated + */ + @Test + public void testSparkSessionTimeout() throws Exception { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + String queryId = "1"; + sparkSession.onQuerySubmission(queryId); + + Assert.assertTrue(sparkSession.isOpen()); + Assert.assertEquals(sparkSession.getActiveQueries().size(), 1); + Assert.assertFalse(sparkSession.getQueryCompleted()); + + sparkSession.onQueryCompletion(queryId); + Assert.assertEquals(sparkSession.getActiveQueries().size(), 0); + Assert.assertTrue(sparkSession.getQueryCompleted()); + Assert.assertTrue(sparkSession.getLastSparkJobCompletionTime() > 0); + + sparkSession.close(); + + Assert.assertFalse(sparkSession.isOpen()); + } + + /** + * Test that a timeout triggered before a query is run does not cause the session to be closed + */ + @Test + public void testTimeoutBeforeQuery() { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + sparkSession.triggerTimeout(-1); + Assert.assertTrue(sparkSession.isOpen()); + } + + /** + * Tests that a timeout triggered while a query is running does not cause the session to be closed + */ + @Test + public void testTimeoutDuringQuery() throws Exception { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + sparkSession.onQuerySubmission("1"); + + sparkSession.triggerTimeout(-1); + + Assert.assertTrue(sparkSession.isOpen()); + } + + /** + * Tests that a timeout triggered after one query has completed causes the session to be closed + */ + @Test + public void testTimeoutAfterQuery() throws Exception { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + String queryId = "1"; + sparkSession.onQuerySubmission(queryId); + + sparkSession.onQueryCompletion(queryId); + + sparkSession.triggerTimeout(-1); + + Assert.assertFalse(sparkSession.isOpen()); + } + + /** + * Tests that a timeout triggered after one query has completed, but during another query does + * not cause the session to be closed + */ + @Test + public void testTimeoutAfterMultipleQueriesDuringQuery() throws Exception { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + String queryId = "1"; + sparkSession.onQuerySubmission(queryId); + sparkSession.onQueryCompletion(queryId); + + queryId = "2"; + sparkSession.onQuerySubmission(queryId); + + sparkSession.triggerTimeout(-1); + + Assert.assertTrue(sparkSession.isOpen()); + } + + /** + * Tests that when a query is submitted while a session is being closed results in a valid state + */ + @Test + public void testSubmitQueryDuringClose() throws Exception { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + // Run a single query first to avoid the "has-one-query-completed" check + String queryId = "1"; + sparkSession.onQuerySubmission(queryId); + sparkSession.onQueryCompletion(queryId); + + ReentrantLock lock = new ReentrantLock(); + Condition condition = lock.newCondition(); + + sparkSession.setHiveSparkClient(new TestHiveSparkClient(lock, condition)); + + // This should cause the close method of TestHiveSparkClient to run + FutureTask triggerTimeoutFuture = new FutureTask<>(() -> { + sparkSession.triggerTimeout(-1); + return null; + }); + new Thread(triggerTimeoutFuture).start(); + + Thread.sleep(1000); + + // isOpen should be marked as false + Assert.assertFalse(sparkSession.getIsOpen()); + + sparkSession.onQuerySubmission("2"); + Assert.assertEquals(sparkSession.getActiveQueries().size(), 1); + + FutureTask isOpenFuture = new FutureTask<>(sparkSession::isOpen); + new Thread(isOpenFuture).start(); + + Thread.sleep(1000); + + // isOpen should block until the call to close() has completed + Assert.assertFalse(isOpenFuture.isDone()); + + // Unblock the close() thread + lock.lock(); + try { + condition.signal(); + } finally { + lock.unlock(); + } + + triggerTimeoutFuture.get(10, TimeUnit.SECONDS); + + // The isOpen thread should report that the session was closed + Assert.assertFalse(isOpenFuture.get(10, TimeUnit.SECONDS)); + } + + private SparkSessionImpl createSparkSessionImpl() { + HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); + + SparkSessionImpl sparkSession = new SparkSessionImpl(UUID.randomUUID().toString()); + + HiveSparkClient mockHiveSparkClient = mock(HiveSparkClient.class); + sparkSession.setHiveSparkClient(mockHiveSparkClient); + + sparkSession.setHiveConf(hiveConf); + sparkSession.setIsOpen(true); + + Assert.assertTrue(sparkSession.getActiveQueries().isEmpty()); + Assert.assertFalse(sparkSession.getQueryCompleted()); + + return sparkSession; + } + + private static final class TestHiveSparkClient implements HiveSparkClient { + + private final Lock lock; + private final Condition condition; + + private TestHiveSparkClient(Lock lock, Condition condition) { + this.lock = lock; + this.condition = condition; + } + + @Override + public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception { + lock.lock(); + try { + condition.await(); + } finally { + lock.unlock(); + } + return null; + } + + @Override + public SparkConf getSparkConf() { + return null; + } + + @Override + public int getExecutorCount() throws Exception { + return 0; + } + + @Override + public int getDefaultParallelism() throws Exception { + return 0; + } + + @Override + public void close() throws IOException { + lock.lock(); + try { + condition.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + } +}