From 8be97574626f84fcbd11acc39ab3fadd6a328c6d Mon Sep 17 00:00:00 2001 From: vkorukanti Date: Fri, 15 Aug 2014 00:59:58 -0700 Subject: [PATCH] HIVE-7606: Design SparkSession and SparkSessionManager. --- .../hadoop/hive/ql/exec/spark/SparkTask.java | 24 ++- .../hive/ql/exec/spark/session/SparkSession.java | 56 +++++++ .../ql/exec/spark/session/SparkSessionImpl.java | 77 +++++++++ .../ql/exec/spark/session/SparkSessionManager.java | 65 ++++++++ .../spark/session/SparkSessionManagerImpl.java | 182 +++++++++++++++++++++ .../hadoop/hive/ql/session/SessionState.java | 22 +++ .../spark/session/TestSparkSessionManagerImpl.java | 115 +++++++++++++ .../apache/hive/service/server/HiveServer2.java | 14 ++ 8 files changed, 551 insertions(+), 4 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 5ac5a25..1b4f245 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; @@ -28,10 +29,15 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -50,18 +56,28 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverC public int execute(DriverContext driverContext) { int rc = 1; - SparkClient client = null; + SparkSession sparkSession = null; + SparkSessionManager sparkSessionManager = null; try { configureNumberOfReducers(); - client = SparkClient.getInstance(driverContext.getCtx().getConf()); - rc = client.execute(driverContext, getWork()); + sparkSessionManager = SparkSessionManagerImpl.getInstance(); + sparkSession = SessionState.get().getSparkSession(); + sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); + SessionState.get().setSparkSession(sparkSession); + + rc = sparkSession.submit(driverContext, getWork()); } catch (Exception e) { LOG.error("Failed to execute spark task.", e); return 1; } finally { - if (client != null) { + if (sparkSession != null) { rc = close(rc); + try { + sparkSessionManager.returnSession(sparkSession); + } catch(HiveException ex) { + LOG.error("Failed to return the session to SessionManager", ex); + } } } return rc; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java new file mode 100644 index 0000000..02456fc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +public interface SparkSession { + /** + * Initializes a Spark session for DAG execution. + */ + public void open(HiveConf conf); + + /** + * Submit given sparkWork to SparkClient + * @param driverContext + * @param sparkWork + */ + public int submit(DriverContext driverContext, SparkWork sparkWork); + + /** + * Is the session open and ready to submit jobs? + */ + public boolean isOpen(); + + /** + * Return configuration. + */ + public HiveConf getConf(); + + /** + * Return session id. + */ + public String getSessionId(); + + /** + * Close session and release resources + */ + public void close(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java new file mode 100644 index 0000000..3cb5c1a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.SparkClient; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +import java.util.UUID; + +/** + * Simple implementation of SparkSession which currently just submits jobs to + * SparkClient which is shared by all SparkSession instances. + */ +public class SparkSessionImpl implements SparkSession { + private HiveConf conf; + private boolean isOpen = false; + private String sessionId; + + public SparkSessionImpl() { + sessionId = makeSessionId(); + } + + @Override + public void open(HiveConf conf) { + this.conf = conf; + isOpen = true; + } + + @Override + public int submit(DriverContext driverContext, SparkWork sparkWork) { + Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); + return SparkClient.getInstance(driverContext.getCtx().getConf()) + .execute(driverContext, sparkWork); + } + + @Override + public boolean isOpen() { + return isOpen; + } + + @Override + public HiveConf getConf() { + return conf; + } + + @Override + public String getSessionId() { + return sessionId; + } + + @Override + public void close() { + isOpen = false; + } + + public static String makeSessionId() { + return UUID.randomUUID().toString(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java new file mode 100644 index 0000000..b7ec5f3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Defines interface for managing multiple SparkSessions in Hive when multiple users + * are executing queries simultaneously on Spark execution engine. + */ +public interface SparkSessionManager { + /** + * Initialize based on given configuration. + * + * @param hiveConf + */ + public void setup(HiveConf hiveConf) throws HiveException; + + /** + * Get a valid SparkSession. First try to check if existing session is reusable + * based on the given conf. If not release existingSession and return + * a new session based on session manager criteria and conf. + * + * @param existingSession Existing session (can be null) + * @param conf + * @param doOpen Should the session be opened before returning? + * @return + */ + public SparkSession getSession(SparkSession existingSession, HiveConf conf, + boolean doOpen) throws HiveException; + + /** + * Return the given sparkSession to pool. This is used when the client + * still holds references to session and may want to reuse it in future. + * When client wants to reuse the session, it should pass the it getSession method. + */ + public void returnSession(SparkSession sparkSession) throws HiveException; + + /** + * Close the given session and return it to pool. This is used when the client + * no longer needs a SparkSession. + */ + public void closeSession(SparkSession sparkSession) throws HiveException; + + /** + * Shutdown the session manager. Also closing up SparkSessions in pool. + */ + public void shutdown(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java new file mode 100644 index 0000000..2bf9d8c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.security.UserGroupInformation; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +/** + * Simple implementation of SparkSessionManager + * - returns SparkSession when requested through getSession and keeps track of + * created sessions. Currently no limit on the number sessions. + * - SparkSession is reused if the userName in new conf and user name in session conf match. + */ +public class SparkSessionManagerImpl implements SparkSessionManager { + private static final Log LOG = LogFactory.getLog(SparkSessionManagerImpl.class); + + private Set createdSessions; + private boolean inited; + + private static SparkSessionManagerImpl instance; + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + if (instance != null) { + instance.shutdown(); + } + } catch (Exception e) { + // ignore + } + } + }); + } + + public synchronized static SparkSessionManagerImpl getInstance() + throws HiveException { + if (instance == null) { + instance = new SparkSessionManagerImpl(); + } + + return instance; + } + + private SparkSessionManagerImpl() { + } + + @Override + public void setup(HiveConf hiveConf) throws HiveException { + LOG.debug("Setting up the session manager."); + init(); + } + + private void init() { + createdSessions = Collections.synchronizedSet(new HashSet()); + inited = true; + } + + /** + * If the existingSession can be reused return it. + * Otherwise + * - close it and remove it from the list. + * - create a new session and add it to the list. + */ + @Override + public SparkSession getSession(SparkSession existingSession, HiveConf conf, + boolean doOpen) throws HiveException { + if (!inited) { + init(); + } + + if (existingSession != null) { + if (canReuseSession(existingSession, conf)) { + // Open the session if it is closed. + if (!existingSession.isOpen() && doOpen) { + existingSession.open(conf); + } + + Preconditions.checkState(createdSessions.contains(existingSession)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Existing session (%s) is reused.", + existingSession.getSessionId())); + } + return existingSession; + } else { + // Close the session, as the client is holding onto a session that can't be used + // by the client anymore. + closeSession(existingSession); + } + } + + SparkSession sparkSession = new SparkSessionImpl(); + createdSessions.add(sparkSession); + + if (doOpen) { + sparkSession.open(conf); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("New session (%s) is created.", sparkSession.getSessionId())); + } + return sparkSession; + } + + /** + * Currently we only match the userNames in existingSession conf and given conf. + */ + private boolean canReuseSession(SparkSession existingSession, HiveConf conf) throws HiveException { + try { + UserGroupInformation newUgi = ShimLoader.getHadoopShims().getUGIForConf(conf); + String newUserName = ShimLoader.getHadoopShims().getShortUserName(newUgi); + + UserGroupInformation ugiInSession = + ShimLoader.getHadoopShims().getUGIForConf(existingSession.getConf()); + String userNameInSession = ShimLoader.getHadoopShims().getShortUserName(ugiInSession); + + return newUserName.equals(userNameInSession); + } catch(Exception ex) { + throw new HiveException("Failed to get user info from HiveConf.", ex); + } + } + + @Override + public void returnSession(SparkSession sparkSession) throws HiveException { + // In this particular SparkSessionManager implementation, we don't recycle + // returned sessions. References to session are still valid. + } + + @Override + public void closeSession(SparkSession sparkSession) throws HiveException { + if (sparkSession == null) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Closing session (%s).", sparkSession.getSessionId())); + } + sparkSession.close(); + createdSessions.remove(sparkSession); + } + + @Override + public void shutdown() { + LOG.debug("Closing the session manager."); + if (createdSessions != null) { + synchronized(createdSessions) { + Iterator it = createdSessions.iterator(); + while (it.hasNext()) { + SparkSession session = it.next(); + session.close(); + } + } + } + inited = false; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index fcfcf42..8cd5b13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.history.HiveHistory; @@ -175,6 +177,8 @@ private String userIpAddress; + private SparkSession sparkSession; + /** * Lineage state. */ @@ -1063,6 +1067,16 @@ public void close() throws IOException { tezSessionState = null; } + if (sparkSession != null) { + try { + SparkSessionManagerImpl.getInstance().closeSession(sparkSession); + } catch(Exception ex) { + LOG.error("Error closing spark session."); + } finally { + sparkSession = null; + } + } + dropSessionPaths(conf); } @@ -1153,4 +1167,12 @@ public void setUserIpAddress(String userIpAddress) { this.userIpAddress = userIpAddress; } + + public SparkSession getSparkSession() { + return sparkSession; + } + + public void setSparkSession(SparkSession sparkSession) { + this.sparkSession = sparkSession; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java new file mode 100644 index 0000000..a5fd7c5 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestSparkSessionManagerImpl { + private SparkSessionManagerImpl sessionManagerHS2 = null; + private Random random = new Random(3573845); + + /** Tests CLI scenario where we get a single session and use it multiple times. */ + @Test + public void testSingleSessionMultipleUse() throws Exception { + HiveConf conf = new HiveConf(); + + SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance(); + SparkSession sparkSession1 = sessionManager.getSession(null, conf, true); + + assertTrue(sparkSession1.isOpen()); + + SparkSession sparkSession2 = sessionManager.getSession(sparkSession1, conf, true); + assertTrue(sparkSession1 == sparkSession2); // Same session object is expected. + + assertTrue(sparkSession2.isOpen()); + sessionManager.shutdown(); + sessionManager.closeSession(sparkSession1); + } + + /** + * Tests multi-user scenario (like HiveServer2) where each user gets a session + * and uses it multiple times. + */ + @Test + public void testMultiSessionMultipleUse() throws Exception { + sessionManagerHS2 = SparkSessionManagerImpl.getInstance(); + + // Shutdown existing session manager + sessionManagerHS2.shutdown(); + + HiveConf hiveConf = new HiveConf(); + sessionManagerHS2.setup(hiveConf); + + List threadList = new ArrayList(); + for (int i = 0; i < 10; i++) { + Thread t = new Thread(new SessionThread(), "Session thread " + i); + t.start(); + threadList.add(t); + } + + for (Thread t : threadList) { + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + + System.out.println("Ending SessionManagerHS2"); + sessionManagerHS2.shutdown(); + } + + /* Thread simulating a user session in HiveServer2. */ + public class SessionThread implements Runnable { + @Override + public void run() { + try { + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + " started."); + HiveConf conf = new HiveConf(); + SparkSession prevSession = null; + SparkSession currentSession = null; + + for(int i=0; i<5; i++) { + currentSession = sessionManagerHS2.getSession(prevSession, conf, true); + assertTrue(prevSession == null || prevSession == currentSession); + assertTrue(currentSession.isOpen()); + System.out.println(String.format("%s got session (%d): %s", + threadName, i, currentSession.getSessionId())); + Thread.sleep((random.nextInt(3)+1) * 1000); + + sessionManagerHS2.returnSession(currentSession); + prevSession = currentSession; + } + sessionManagerHS2.closeSession(currentSession); + System.out.println(threadName + " ended."); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 0864dfb..84ed5ae 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.service.CompositeService; @@ -87,6 +88,15 @@ public synchronized void stop() { e.printStackTrace(); } } + + if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + try { + SparkSessionManagerImpl.getInstance().shutdown(); + } catch(Exception ex) { + LOG.error("Spark session pool manager failed to stop during HiveServer2 shutdown.", ex); + ex.printStackTrace(); + } + } } private static void startHiveServer2() throws Throwable { @@ -104,6 +114,10 @@ private static void startHiveServer2() throws Throwable { sessionPool.setupPool(hiveConf); sessionPool.startPool(); } + + if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + SparkSessionManagerImpl.getInstance().setup(hiveConf); + } break; } catch (Throwable throwable) { if(++attempts >= maxAttempts) { -- 1.8.5.2 (Apple Git-48)