diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 72ff53e3bd..6c6122a88c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hive.spark.client.SparkClientUtilities; -import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -62,32 +61,34 @@ * environment and execute spark work. */ public class LocalHiveSparkClient implements HiveSparkClient { + private static final long serialVersionUID = 1L; - private static final String MR_JAR_PROPERTY = "tmpjars"; - protected static final transient Logger LOG = LoggerFactory - .getLogger(LocalHiveSparkClient.class); + private static final transient Logger LOG = LoggerFactory + .getLogger(LocalHiveSparkClient.class); + private static final String MR_JAR_PROPERTY = "tmpjars"; private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); private static LocalHiveSparkClient client; - - public static synchronized LocalHiveSparkClient getInstance( - SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException { - if (client == null) { - client = new LocalHiveSparkClient(sparkConf, hiveConf); - } - return client; - } + private int activeSessions = 0; private final JavaSparkContext sc; private final List localJars = new ArrayList(); - private final List localFiles = new ArrayList(); private final JobMetricsListener jobMetricsListener; + public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf, HiveConf hiveConf) + throws FileNotFoundException, MalformedURLException { + if (client == null) { + client = new LocalHiveSparkClient(sparkConf, hiveConf); + } + ++client.activeSessions; + return client; + } + private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException { String regJar = null; @@ -239,10 +240,13 @@ private void addJars(String addedJars) { @Override public void close() { synchronized (LocalHiveSparkClient.class) { - client = null; - } - if (sc != null) { - sc.stop(); + if (--activeSessions == 0) { + client = null; + if (sc != null) { + LOG.debug("Shutting down the SparkContext"); + sc.stop(); + } + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index bb50129518..1d251edaf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -109,11 +109,6 @@ */ private final Set activeJobs = Sets.newConcurrentHashSet(); - /** - * True if at least a single query has been run by this session, false otherwise. - */ - private volatile boolean queryCompleted; - private ReadWriteLock closeLock = new ReentrantReadWriteLock(); SparkSessionImpl(String sessionId) { @@ -123,27 +118,26 @@ @Override public void open(HiveConf conf) throws HiveException { - closeLock.readLock().lock(); + closeLock.writeLock().lock(); + try { - LOG.info("Trying to open Hive on Spark session {}", sessionId); - this.conf = conf; - isOpen = true; - try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, - SessionState.get().getSessionId()); - } catch (Throwable e) { - // It's possible that user session is closed while creating Spark client. - HiveException he; - if (isOpen) { - he = getHiveException(e); - } else { - he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + if (!isOpen) { + LOG.info("Trying to open Hive on Spark session {}", sessionId); + this.conf = conf; + + try { + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, + SessionState.get().getSessionId()); + isOpen = true; + } catch (Throwable e) { + throw getHiveException(e); } - throw he; + LOG.info("Hive on Spark session {} successfully opened", sessionId); + } else { + LOG.info("Hive on Spark session {} is already opened", sessionId); } - LOG.info("Hive on Spark session {} successfully opened", sessionId); } finally { - closeLock.readLock().unlock(); + closeLock.writeLock().unlock(); } } @@ -198,12 +192,7 @@ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) thro @Override public boolean isOpen() { - closeLock.readLock().lock(); - try { - return isOpen; - } finally { - closeLock.readLock().unlock(); - } + return isOpen; } @Override @@ -220,10 +209,10 @@ public String getSessionId() { public void close() { if (isOpen) { closeLock.writeLock().lock(); + try { if (isOpen) { LOG.info("Trying to close Hive on Spark session {}", sessionId); - isOpen = false; if (hiveSparkClient != null) { try { hiveSparkClient.close(); @@ -234,8 +223,9 @@ public void close() { } } hiveSparkClient = null; - queryCompleted = false; lastSparkJobCompletionTime = 0; + + isOpen = false; } } finally { closeLock.writeLock().unlock(); @@ -348,10 +338,11 @@ public void onQuerySubmission(String queryId) { */ @Override public boolean triggerTimeout(long sessionTimeout) { - if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { + if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { closeLock.writeLock().lock(); + try { - if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { + if (hasTimedOut(activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " + "been run in the past " + sessionTimeout / 1000 + " seconds"); close(); @@ -366,28 +357,24 @@ public boolean triggerTimeout(long sessionTimeout) { /** * Returns true if a session has timed out, false otherwise. The following conditions must be met - * in order to consider a session as timed out: (1) the session must have run at least one - * query, (2) there can be no actively running Spark jobs, and (3) the last completed Spark job - * must have been more than sessionTimeout seconds ago. + * in order to consider a session as timed out: + * (1) the session must have run at least one query (i.e. lastSparkJobCompletionTime > 0), + * (2) there can be no actively running Spark jobs, and + * (3) the last completed Spark job must have been more than sessionTimeout seconds ago. */ - private static boolean hasTimedOut(boolean queryCompleted, Set activeJobs, + private static boolean hasTimedOut(Set activeJobs, long lastSparkJobCompletionTime, long sessionTimeout) { - return queryCompleted && - activeJobs.isEmpty() && + return activeJobs.isEmpty() && lastSparkJobCompletionTime > 0 && (System.currentTimeMillis() - lastSparkJobCompletionTime) > sessionTimeout; } /** - * When this session completes the execution of a query, set the {@link #queryCompleted} flag - * to true if it hasn't already been set, remove the query from the list of actively running jobs, + * When this session completes the execution of a query, remove the query from the list of actively running jobs, * and set the {@link #lastSparkJobCompletionTime} to the current timestamp. */ @Override public void onQueryCompletion(String queryId) { - if (!queryCompleted) { - queryCompleted = true; - } activeJobs.remove(queryId); lastSparkJobCompletionTime = System.currentTimeMillis(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java new file mode 100644 index 0000000000..bbf3d9c05f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestLocalHiveSparkClient.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.lang.reflect.Field; +import java.net.MalformedURLException; +import java.nio.file.Paths; +import java.util.List; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.fail; + +/** + * With local spark context, all user sessions share the same spark context. + */ +public class TestLocalHiveSparkClient { + + private final CyclicBarrier barrier = new CyclicBarrier(2); + + @Test + public void testMultiSessionSparkContextReUse() throws MalformedURLException { + String confDir = "../data/conf/spark/local/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + + ExecutorService executor = Executors.newFixedThreadPool(barrier.getParties()); + + List> futures = + IntStream.range(0, barrier.getParties()).boxed() + .map(i -> CompletableFuture.supplyAsync(() -> execute(i), executor)) + .collect(Collectors.toList()); + + futures.forEach(CompletableFuture::join); + } + + private Void execute(Integer threadId) { + HiveConf conf = new HiveConf(); + + conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestLocalHiveSparkClient-testMultiSessionSparkContextReuse-local-dir").toString()); + + SessionState.start(conf); + try{ + runSparkTestSession(conf, threadId); + } catch (Exception ex){ + fail(ex.getMessage()); + } + return null; + } + + private void runSparkTestSession(HiveConf conf, int threadId) throws Exception { + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT, "10s"); + conf.setVar(HiveConf.ConfVars.SPARK_SESSION_TIMEOUT_PERIOD, "1s"); + + Driver driver = null; + try { + driver = new Driver(new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf).build(), null, null); + + SparkSession sparkSession = SparkUtilities.getSparkSession(conf, + SparkSessionManagerImpl.getInstance()); + + Assert.assertEquals(0, driver.run("show tables").getResponseCode()); + barrier.await(); + + SparkContext sparkContext = getSparkContext(sparkSession); + Assert.assertFalse(sparkContext.isStopped()); + + if(threadId == 1) { + barrier.await(); + closeSparkSession(sparkSession); + Assert.assertTrue(sparkContext.isStopped()); + + } else { + closeSparkSession(sparkSession); + Assert.assertFalse(sparkContext.isStopped()); + barrier.await(); + } + } finally { + if (driver != null) { + driver.destroy(); + } + } + } + + private void closeSparkSession(SparkSession session) throws ReflectiveOperationException { + Assert.assertTrue(session.isOpen()); + session.close(); + + Assert.assertFalse(session.isOpen()); + } + + private SparkContext getSparkContext(SparkSession sparkSession) throws ReflectiveOperationException { + HiveSparkClient sparkClient = getSparkClient(sparkSession); + Assert.assertNotNull(sparkClient); + + return getSparkContext(sparkClient).sc(); + } + + private JavaSparkContext getSparkContext(HiveSparkClient sparkClient) throws ReflectiveOperationException { + Field sparkContextField = LocalHiveSparkClient.class.getDeclaredField("sc"); + sparkContextField.setAccessible(true); + + return (JavaSparkContext) sparkContextField.get(sparkClient); + } + + private HiveSparkClient getSparkClient(SparkSession sparkSession) throws ReflectiveOperationException { + Field clientField = SparkSessionImpl.class.getDeclaredField("hiveSparkClient"); + clientField.setAccessible(true); + + return (HiveSparkClient) clientField.get(sparkSession); + } +}