commit 5384993648e21e17f098e483c99ea5f7708fce58 Author: Sahil Takiar Date: Sun Apr 1 21:21:43 2018 -0500 Set hos session id to integer instead of session id diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index c22fb8923d..2532171d13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -63,8 +63,9 @@ @VisibleForTesting public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception { - Map sparkConf = initiateSparkConf(hiveconf, sessionId); + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sparkSessionId, + String hiveSessionId) throws Exception { + Map sparkConf = initiateSparkConf(hiveconf, hiveSessionId); // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. @@ -73,11 +74,11 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se // With local spark context, all user sessions share the same spark context. return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf), hiveconf); } else { - return new RemoteHiveSparkClient(hiveconf, sparkConf, sessionId); + return new RemoteHiveSparkClient(hiveconf, sparkConf, hiveSessionId + "_" + sparkSessionId); } } - public static Map initiateSparkConf(HiveConf hiveConf, String sessionId) { + public static Map initiateSparkConf(HiveConf hiveConf, String hiveSessionId) { Map sparkConf = new HashMap(); HBaseConfiguration.addHbaseResources(hiveConf); @@ -85,9 +86,9 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); final String appNameKey = "spark.app.name"; String appName = hiveConf.get(appNameKey); - final String sessionIdString = " (sessionId = " + sessionId + ")"; + final String sessionIdString = " (hiveSessionId = " + hiveSessionId + ")"; if (appName == null) { - if (sessionId == null) { + if (hiveSessionId == null) { appName = SPARK_DEFAULT_APP_NAME; } else { appName = SPARK_DEFAULT_APP_NAME + sessionIdString; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 2d5d03ee71..75b112d00d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -73,8 +73,8 @@ private final Object dirLock = new Object(); private String matchedString = null; - public SparkSessionImpl() { - sessionId = makeSessionId(); + SparkSessionImpl(String sessionId) { + this.sessionId = sessionId; initErrorPatterns(); } @@ -84,7 +84,8 @@ public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, + SessionState.get().getSessionId()); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. HiveException he; @@ -260,10 +261,6 @@ public Path getHDFSSessionDir() throws IOException { return scratchDir; } - public static String makeSessionId() { - return UUID.randomUUID().toString(); - } - @VisibleForTesting HiveSparkClient getHiveSparkClient() { return hiveSparkClient; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 46cee0d903..68c9e045f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +111,7 @@ public SparkSession getSession(SparkSession existingSession, HiveConf conf, bool return existingSession; } - SparkSession sparkSession = new SparkSessionImpl(); + SparkSession sparkSession = new SparkSessionImpl(SessionState.get().getNewSparkSessionId()); if (doOpen) { sparkSession.open(conf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6003ced27e..47b5a4d565 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -41,6 +41,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; @@ -297,6 +298,8 @@ private List cleanupItems = new LinkedList(); + private final AtomicLong sparkSessionId = new AtomicLong(); + public HiveConf getConf() { return sessionConf; } @@ -2005,6 +2008,9 @@ public void addCleanupItem(Closeable item) { return currentFunctionsInUse; } + public String getNewSparkSessionId() { + return Long.toString(this.sparkSessionId.getAndIncrement()); + } } class ResourceMaps { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index fe95ce0a85..d4e06d1e8e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.StringUtils; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -44,6 +48,12 @@ private SparkSessionManagerImpl sessionManagerHS2 = null; private boolean anyFailedSessionThread; // updated only when a thread has failed. + private static HiveConf SESSION_HIVE_CONF = new HiveConf(); + + @BeforeClass + public static void setup() { + SessionState.start(SESSION_HIVE_CONF); + } /** Tests CLI scenario where we get a single session and use it multiple times. */ @@ -83,7 +93,7 @@ public void testMultiSessionMultipleUse() throws Exception { List threadList = new ArrayList(); for (int i = 0; i < 10; i++) { - Thread t = new Thread(new SessionThread(), "Session thread " + i); + Thread t = new Thread(new SessionThread(SessionState.get()), "Session thread " + i); t.start(); threadList.add(t); } @@ -187,6 +197,23 @@ public void testGetHiveException() throws Exception { checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR); } + @Test + public void testGetSessionId() throws HiveException { + SessionState ss = SessionState.start(SESSION_HIVE_CONF); + SparkSessionManager ssm = SparkSessionManagerImpl.getInstance(); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("0", ss.getSparkSession().getSessionId()); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("1", ss.getSparkSession().getSessionId()); + + ss = SessionState.start(SESSION_HIVE_CONF); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("0", ss.getSparkSession().getSessionId()); + } + private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { checkHiveException(ss, e, expectedErrMsg, null); } @@ -222,10 +249,16 @@ private void checkSparkConf(HiveConf conf, String paramName, String expectedValu /* Thread simulating a user session in HiveServer2. */ public class SessionThread implements Runnable { + private final SessionState ss; + + private SessionThread(SessionState ss) { + this.ss = ss; + } @Override public void run() { try { + SessionState.setCurrentSessionState(ss); Random random = new Random(Thread.currentThread().getId()); String threadName = Thread.currentThread().getName(); System.out.println(threadName + " started.");