commit 6430de17aaac13308b292e979a60d8a5027d0565 Author: Sahil Takiar Date: Thu Sep 13 17:43:41 2018 -0700 HIVE-20519: Remove 30m min value for hive.spark.session.timeout diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ed6d3d80e3..b81b9ccaf9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4403,9 +4403,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "app. If " + HIVE_SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + "InProcessLauncher is used to programmatically launch the app."), SPARK_SESSION_TIMEOUT("hive.spark.session.timeout", "30m", new TimeValidator(TimeUnit.MINUTES, - 30L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + - " a Spark job to be submitted before shutting down. Minimum value is 30 minutes"), - SPARK_SESSION_TIMEOUT_PERIOD("hive.spark.session.timeout.period", "60s", + 0L, true, null, true), "Amount of time the Spark Remote Driver should wait for " + + " a Spark job to be submitted before shutting down. The Remote Driver will always " + + "wait for at least one Hive query to complete before it times out. A value of 0 " + + "shuts down the driver immediately after one query has completed."), + SPARK_SESSION_TIMEOUT_CHECK_PERIOD("hive.spark.session.timeout.check.period", "60s", new TimeValidator(TimeUnit.SECONDS, 60L, true, null, true), "How frequently to check for idle Spark sessions. Minimum value is 60 seconds."), NWAYJOINREORDER("hive.reorder.nway.joins", true, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java index 7ede07d841..000c7f1eff 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -29,7 +29,7 @@ import org.junit.Test; import java.io.File; -import java.net.MalformedURLException; +import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -42,7 +42,7 @@ public class TestSparkSessionTimeout { @Test - public void testSparkSessionTimeout() throws HiveException, InterruptedException, MalformedURLException { + public void testSparkSessionTimeout() throws HiveException, InterruptedException, IOException, ExecutionException { String confDir = "../../data/conf/spark/standalone/hive-site.xml"; HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); @@ -50,29 +50,32 @@ public void testSparkSessionTimeout() throws HiveException, InterruptedException conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString()); - SessionState.start(conf); + runTestSparkSessionTimeout(conf, "5s", 10, 1); - runTestSparkSessionTimeout(conf, 1); - } + // testSparkSessionImmediateTimeout + confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); - @Test - public void testMultiSessionSparkSessionTimeout() throws InterruptedException, - ExecutionException { + conf = new HiveConf(); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testSparkSessionTimeout-local-dir").toString()); + + runTestSparkSessionTimeout(conf, "0s", 5, 1); + + // testMultiSessionSparkSessionTimeout List> futures = new ArrayList<>(); - ExecutorService es = Executors.newFixedThreadPool(10); - for (int i = 0; i < 10; i++) { + ExecutorService es = Executors.newFixedThreadPool(5); + for (int i = 0; i < 5; i++) { futures.add(es.submit(() -> { - String confDir = "../../data/conf/spark/local/hive-site.xml"; - HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + String localConfDir = "../../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(localConfDir).toURI().toURL()); - HiveConf conf = new HiveConf(); - conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); - conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), - "TestSparkSessionTimeout-testMultiSessionSparkSessionTimeout-local-dir").toString()); + HiveConf multiSessionConf = new HiveConf(); + multiSessionConf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); + multiSessionConf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); - SessionState.start(conf); - - runTestSparkSessionTimeout(conf, 1); + runTestSparkSessionTimeout(multiSessionConf, "15s", 30, 1); return null; })); } @@ -81,24 +84,12 @@ public void testMultiSessionSparkSessionTimeout() throws InterruptedException, } } - @Test - public void testSparkSessionMultipleTimeout() throws HiveException, InterruptedException, MalformedURLException { - String confDir = "../../data/conf/spark/standalone/hive-site.xml"; - HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); - - HiveConf conf = new HiveConf(); - conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), - "TestSparkSessionTimeout-testSparkSessionMultipleTimeout-local-dir").toString()); - - SessionState.start(conf); - - runTestSparkSessionTimeout(conf, 2); - } + private void runTestSparkSessionTimeout(HiveConf conf, String timeout, long sleepTime, + int sleepRunIteration) throws HiveException, InterruptedException, IOException { + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, timeout); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_CHECK_PERIOD, "1s"); - private void runTestSparkSessionTimeout(HiveConf conf, int sleepRunIteration) throws HiveException, - InterruptedException { - conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s"); - conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); + SessionState ss = SessionState.start(conf); String tableName = "test" + UUID.randomUUID().toString().replace("-", ""); @@ -119,7 +110,7 @@ private void runTestSparkSessionTimeout(HiveConf conf, int sleepRunIteration) th driver.run("select * from " + tableName + " order by col").getResponseCode()); for (int i = 0; i < sleepRunIteration; i++) { - Thread.sleep(10000); + Thread.sleep(sleepTime * 1000); Assert.assertFalse(sparkSession.isOpen()); @@ -131,6 +122,7 @@ private void runTestSparkSessionTimeout(HiveConf conf, int sleepRunIteration) th Assert.assertEquals(0, driver.run("drop table if exists " + tableName).getResponseCode()); driver.destroy(); } + ss.close(); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 84316bc7e0..a5f899a070 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -544,12 +544,6 @@ void compile(String command, boolean resetTaskIds, boolean deferClose) throws Co LockedDriverState.setLockedDriverState(lDrvState); String queryId = queryState.getQueryId(); - - SparkSession ss = SessionState.get().getSparkSession(); - if (ss != null) { - ss.onQuerySubmission(queryId); - } - if (ctx != null) { setTriggerContext(queryId); } @@ -572,6 +566,11 @@ void compile(String command, boolean resetTaskIds, boolean deferClose) throws Co try { + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQuerySubmission(queryId); + } + // Initialize the transaction manager. This must be done before analyze is called. if (initTxnMgr != null) { queryTxnMgr = initTxnMgr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 1d251edaf7..2a720bef84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -107,7 +108,7 @@ /** * A {@link Set} of currently running queries. Each job is identified by its query id. */ - private final Set activeJobs = Sets.newConcurrentHashSet(); + private final Set activeQueries = Sets.newConcurrentHashSet(); private ReadWriteLock closeLock = new ReentrantReadWriteLock(); @@ -224,8 +225,9 @@ public void close() { } hiveSparkClient = null; lastSparkJobCompletionTime = 0; - isOpen = false; + } else { + LOG.info("Ignoring attempt to close Spark session because it has already been closed"); } } finally { closeLock.writeLock().unlock(); @@ -330,7 +332,7 @@ public Path getHDFSSessionDir() throws IOException { @Override public void onQuerySubmission(String queryId) { - activeJobs.add(queryId); + activeQueries.add(queryId); } /** @@ -338,11 +340,11 @@ public void onQuerySubmission(String queryId) { */ @Override public boolean triggerTimeout(long sessionTimeout) { - if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { + if (hasTimedOut(activeQueries, lastSparkJobCompletionTime, sessionTimeout)) { closeLock.writeLock().lock(); try { - if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { + if (hasTimedOut(activeQueries, lastSparkJobCompletionTime, sessionTimeout)) { LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " + "been run in the past " + sessionTimeout / 1000 + " seconds"); close(); @@ -362,9 +364,9 @@ public boolean triggerTimeout(long sessionTimeout) { * (2) there can be no actively running Spark jobs, and * (3) the last completed Spark job must have been more than sessionTimeout seconds ago. */ - private static boolean hasTimedOut(Set activeJobs, + private static boolean hasTimedOut(Set activeQueries, long lastSparkJobCompletionTime, long sessionTimeout) { - return activeJobs.isEmpty() && + return activeQueries.isEmpty() && lastSparkJobCompletionTime > 0 && (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout; } @@ -375,12 +377,46 @@ private static boolean hasTimedOut(Set activeJobs, */ @Override public void onQueryCompletion(String queryId) { - activeJobs.remove(queryId); + activeQueries.remove(queryId); lastSparkJobCompletionTime = System.currentTimeMillis(); + + long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + + if (sessionTimeout == 0 && activeQueries.isEmpty()) { + LOG.warn("Attempting to close Spark session " + getSessionId() + " because " + + HiveConf.ConfVars.SPARK_SESSION_TIMEOUT.varname + " is set to " + 0); + close(); + } } @VisibleForTesting HiveSparkClient getHiveSparkClient() { return hiveSparkClient; } + + @VisibleForTesting + void setHiveSparkClient(HiveSparkClient hiveSparkClient) { + this.hiveSparkClient = hiveSparkClient; + } + + @VisibleForTesting + void setHiveConf(HiveConf hiveConf) { + conf = hiveConf; + } + + @VisibleForTesting + void setIsOpen(boolean isOpen) { + this.isOpen = isOpen; + } + + @VisibleForTesting + long getLastSparkJobCompletionTime() { + return this.lastSparkJobCompletionTime; + } + + @VisibleForTesting + Set getActiveQueries() { + return this.activeQueries; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index e2f3a11ffa..f4f1b18568 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -176,16 +176,20 @@ public void shutdown() { private void startTimeoutThread() { long sessionTimeout = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, + long sessionTimeoutPeriod = conf.getTimeVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_CHECK_PERIOD, TimeUnit.MILLISECONDS); ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); - // Schedules a thread that does the following: iterates through all the active SparkSessions - // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then - // the SparkSession is removed from the set of active sessions managed by this class. - timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() - .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) - .forEach(createdSessions::remove), - 0, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + // Only schedule the thread if the session timeout is greater than 0. If the timeout is equal + // to 0 then the session will be timed out by SparkSession itself as soon as a query completes. + if (sessionTimeout > 0) { + // Schedules a thread that does the following: iterates through all the active SparkSessions + // and calls #triggerTimeout(long) on each one. If #triggerTimeout(long) returns true, then + // the SparkSession is removed from the set of active sessions managed by this class. + timeoutFuture = es.scheduleAtFixedRate(() -> createdSessions.stream() + .filter(sparkSession -> sparkSession.triggerTimeout(sessionTimeout)) + .forEach(createdSessions::remove), + sessionTimeout, sessionTimeoutPeriod, TimeUnit.MILLISECONDS); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java index bbf3d9c05f..bb31c6928c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java @@ -85,7 +85,7 @@ private Void execute(Integer threadId) { private void runSparkTestSession(HiveConf conf, int threadId) throws Exception { conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "10s"); - conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_CHECK_PERIOD, "1s"); Driver driver = null; try { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionImplTimeout.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionImplTimeout.java new file mode 100644 index 0000000000..876940a08c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionImplTimeout.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + + +public class TestSparkSessionImplTimeout { + + /** + * Tests that all variables tracked for the timeout logic are properly updated + */ + @Test + public void testSparkSessionTimeout() { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + String queryId = "1"; + sparkSession.onQuerySubmission(queryId); + + Assert.assertTrue(sparkSession.isOpen()); + Assert.assertEquals(sparkSession.getActiveQueries().size(), 1); + + sparkSession.onQueryCompletion(queryId); + Assert.assertEquals(sparkSession.getActiveQueries().size(), 0); + Assert.assertTrue(sparkSession.getLastSparkJobCompletionTime() > 0); + + sparkSession.close(); + + Assert.assertFalse(sparkSession.isOpen()); + } + + /** + * Test that a timeout triggered before a query is run does not cause the session to be closed + */ + @Test + public void testTimeoutBeforeQuery() { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + sparkSession.triggerTimeout(-1); + Assert.assertTrue(sparkSession.isOpen()); + } + + /** + * Tests that a timeout triggered while a query is running does not cause the session to be closed + */ + @Test + public void testTimeoutDuringQuery() { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + sparkSession.onQuerySubmission("1"); + + sparkSession.triggerTimeout(-1); + + Assert.assertTrue(sparkSession.isOpen()); + } + + /** + * Tests that a timeout triggered after one query has completed causes the session to be closed + */ + @Test + public void testTimeoutAfterQuery() { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + String queryId = "1"; + sparkSession.onQuerySubmission(queryId); + + sparkSession.onQueryCompletion(queryId); + + sparkSession.triggerTimeout(-1); + + Assert.assertFalse(sparkSession.isOpen()); + } + + /** + * Tests that a timeout triggered after one query has completed, but during another query does + * not cause the session to be closed + */ + @Test + public void testTimeoutAfterMultipleQueriesDuringQuery() { + SparkSessionImpl sparkSession = createSparkSessionImpl(); + + String queryId = "1"; + sparkSession.onQuerySubmission(queryId); + sparkSession.onQueryCompletion(queryId); + + queryId = "2"; + sparkSession.onQuerySubmission(queryId); + + sparkSession.triggerTimeout(-1); + + Assert.assertTrue(sparkSession.isOpen()); + } + + private SparkSessionImpl createSparkSessionImpl() { + HiveConf hiveConf = new HiveConf(); + SessionState.start(hiveConf); + + SparkSessionImpl sparkSession = new SparkSessionImpl(UUID.randomUUID().toString()); + + HiveSparkClient mockHiveSparkClient = mock(HiveSparkClient.class); + sparkSession.setHiveSparkClient(mockHiveSparkClient); + + sparkSession.setHiveConf(hiveConf); + sparkSession.setIsOpen(true); + + Assert.assertTrue(sparkSession.getActiveQueries().isEmpty()); + + return sparkSession; + } +}