diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java index d8dd80a1c2..7ede07d841 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -52,7 +52,7 @@ public void testSparkSessionTimeout() throws HiveException, InterruptedException SessionState.start(conf); - runTestSparkSessionTimeout(conf); + runTestSparkSessionTimeout(conf, 1); } @Test @@ -72,7 +72,7 @@ public void testMultiSessionSparkSessionTimeout() throws InterruptedException, SessionState.start(conf); - runTestSparkSessionTimeout(conf); + runTestSparkSessionTimeout(conf, 1); return null; })); } @@ -81,7 +81,21 @@ public void testMultiSessionSparkSessionTimeout() throws InterruptedException, } } - private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException, + @Test + public void testSparkSessionMultipleTimeout() throws HiveException, InterruptedException, MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + HiveConf conf = new HiveConf(); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestSparkSessionTimeout-testSparkSessionMultipleTimeout-local-dir").toString()); + + SessionState.start(conf); + + runTestSparkSessionTimeout(conf, 2); + } + + private void runTestSparkSessionTimeout(HiveConf conf, int sleepRunIteration) throws HiveException, InterruptedException { conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "5s"); conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); @@ -104,12 +118,14 @@ private void runTestSparkSessionTimeout(HiveConf conf) throws HiveException, Assert.assertEquals(0, driver.run("select * from " + tableName + " order by col").getResponseCode()); - Thread.sleep(10000); + for (int i = 0; i < sleepRunIteration; i++) { + Thread.sleep(10000); - Assert.assertFalse(sparkSession.isOpen()); + Assert.assertFalse(sparkSession.isOpen()); - Assert.assertEquals(0, - driver.run("select * from " + tableName + " order by col").getResponseCode()); + Assert.assertEquals(0, + driver.run("select * from " + tableName + " order by col").getResponseCode()); + } } finally { if (driver != null) { Assert.assertEquals(0, driver.run("drop table if exists " + tableName).getResponseCode()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 8dae54d7bd..e2f3a11ffa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -121,6 +121,7 @@ public SparkSession getSession(SparkSession existingSession, HiveConf conf, bool // Open the session if it is closed. if (!existingSession.isOpen() && doOpen) { existingSession.open(conf); + createdSessions.add(existingSession); } return existingSession; }