diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 72ff53e..8ecc786 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hive.spark.client.SparkClientUtilities; -import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -62,32 +61,34 @@ * environment and execute spark work. */ public class LocalHiveSparkClient implements HiveSparkClient { + private static final long serialVersionUID = 1L; - private static final String MR_JAR_PROPERTY = "tmpjars"; - protected static final transient Logger LOG = LoggerFactory - .getLogger(LocalHiveSparkClient.class); + private static final transient Logger LOG = LoggerFactory + .getLogger(LocalHiveSparkClient.class); + private static final String MR_JAR_PROPERTY = "tmpjars"; private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings(); private static LocalHiveSparkClient client; - - public static synchronized LocalHiveSparkClient getInstance( - SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException { - if (client == null) { - client = new LocalHiveSparkClient(sparkConf, hiveConf); - } - return client; - } + private static int activeSessions = 0; private final JavaSparkContext sc; private final List localJars = new ArrayList(); - private final List localFiles = new ArrayList(); private final JobMetricsListener jobMetricsListener; + public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf, HiveConf hiveConf) + throws FileNotFoundException, MalformedURLException { + ++activeSessions; + if (client == null) { + client = new LocalHiveSparkClient(sparkConf, hiveConf); + } + return client; + } + private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException { String regJar = null; @@ -239,10 +240,13 @@ private void addJars(String addedJars) { @Override public void close() { synchronized (LocalHiveSparkClient.class) { - client = null; - } - if (sc != null) { - sc.stop(); + if (--activeSessions == 0) { + client = null; + if (sc != null) { + LOG.debug("Shutting down the SparkContext"); + sc.stop(); + } + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 6a8b42e..f6451e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -125,24 +125,21 @@ @Override public void open(HiveConf conf) throws HiveException { closeLock.readLock().lock(); + try { - LOG.info("Trying to open Hive on Spark session {}", sessionId); - this.conf = conf; - isOpen = true; - try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, - SessionState.get().getSessionId()); - } catch (Throwable e) { - // It's possible that user session is closed while creating Spark client. - HiveException he; - if (isOpen) { - he = getHiveException(e); - } else { - he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId); + if (!isOpen) { + LOG.info("Trying to open Hive on Spark session {}", sessionId); + this.conf = conf; + + try { + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, + SessionState.get().getSessionId()); + isOpen = true; + } catch (Throwable e) { + throw getHiveException(e); } - throw he; + LOG.info("Hive on Spark session {} successfully opened", sessionId); } - LOG.info("Hive on Spark session {} successfully opened", sessionId); } finally { closeLock.readLock().unlock(); } @@ -199,12 +196,7 @@ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) thro @Override public boolean isOpen() { - closeLock.readLock().lock(); - try { - return isOpen; - } finally { - closeLock.readLock().unlock(); - } + return isOpen; } @Override @@ -221,10 +213,10 @@ public String getSessionId() { public void close() { if (isOpen) { closeLock.writeLock().lock(); + try { if (isOpen) { LOG.info("Trying to close Hive on Spark session {}", sessionId); - isOpen = false; if (hiveSparkClient != null) { try { hiveSparkClient.close(); @@ -237,6 +229,8 @@ public void close() { hiveSparkClient = null; queryCompleted = false; lastSparkJobCompletionTime = 0; + + isOpen = false; } } finally { closeLock.writeLock().unlock(); @@ -348,17 +342,10 @@ public void onQuerySubmission(String queryId) { @Override public boolean triggerTimeout(long sessionTimeout) { if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { - closeLock.writeLock().lock(); - try { - if (hasTimedOut(queryCompleted, activeJobs, lastSparkJobCompletionTime, sessionTimeout)) { - LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " + - "been run in the past " + sessionTimeout / 1000 + " seconds"); - close(); - return true; - } - } finally { - closeLock.writeLock().unlock(); - } + LOG.warn("Closing Spark session " + getSessionId() + " because a Spark job has not " + + "been run in the past " + sessionTimeout / 1000 + " seconds"); + close(); + return true; } return false; }