commit 253595ea504fe19223c0f2b7e1552a4e31211dee Author: Sahil Takiar Date: Sun Apr 1 21:21:43 2018 -0500 Set hos session id to integer instead of session id diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 5ed5d4214e..0aae0d8205 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -62,8 +62,9 @@ @VisibleForTesting public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception { - Map sparkConf = initiateSparkConf(hiveconf, sessionId); + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sparkSessionId, + String hiveSessionId) throws Exception { + Map sparkConf = initiateSparkConf(hiveconf, hiveSessionId); // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. @@ -72,11 +73,11 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se // With local spark context, all user sessions share the same spark context. return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf), hiveconf); } else { - return new RemoteHiveSparkClient(hiveconf, sparkConf, sessionId); + return new RemoteHiveSparkClient(hiveconf, sparkConf, hiveSessionId + "_" + sparkSessionId); } } - public static Map initiateSparkConf(HiveConf hiveConf, String sessionId) { + public static Map initiateSparkConf(HiveConf hiveConf, String hiveSessionId) { Map sparkConf = new HashMap(); HBaseConfiguration.addHbaseResources(hiveConf); @@ -84,9 +85,9 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); final String appNameKey = "spark.app.name"; String appName = hiveConf.get(appNameKey); - final String sessionIdString = " (sessionId = " + sessionId + ")"; + final String sessionIdString = " (hiveSessionId = " + hiveSessionId + ")"; if (appName == null) { - if (sessionId == null) { + if (hiveSessionId == null) { appName = SPARK_DEFAULT_APP_NAME; } else { appName = SPARK_DEFAULT_APP_NAME + sessionIdString; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index c8cb1ac08c..bedb14c403 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -72,8 +72,8 @@ private Path scratchDir; private final Object dirLock = new Object(); - public SparkSessionImpl() { - sessionId = makeSessionId(); + SparkSessionImpl(String sessionId) { + this.sessionId = sessionId; initErrorPatterns(); } @@ -83,7 +83,8 @@ public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId, + SessionState.get().getSessionId()); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. HiveException he; @@ -258,10 +259,6 @@ public Path getHDFSSessionDir() throws IOException { return scratchDir; } - public static String makeSessionId() { - return UUID.randomUUID().toString(); - } - @VisibleForTesting HiveSparkClient getHiveSparkClient() { return hiveSparkClient; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 46cee0d903..68c9e045f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +111,7 @@ public SparkSession getSession(SparkSession existingSession, HiveConf conf, bool return existingSession; } - SparkSession sparkSession = new SparkSessionImpl(); + SparkSession sparkSession = new SparkSessionImpl(SessionState.get().getNewSparkSessionId()); if (doOpen) { sparkSession.open(conf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 83490d2d53..ce6e3b4f15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -42,6 +42,7 @@ import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; @@ -309,6 +310,8 @@ private List cleanupItems = new LinkedList(); + private final AtomicLong sparkSessionId = new AtomicLong(); + public HiveConf getConf() { return sessionConf; } @@ -2037,6 +2040,9 @@ public void addCleanupItem(Closeable item) { return currentFunctionsInUse; } + public String getNewSparkSessionId() { + return Long.toString(this.sparkSessionId.getAndIncrement()); + } } class ResourceMaps { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 15756da186..0445dd6ab6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hive.ql.exec.spark.session; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; + +import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -34,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.spark.SparkConf; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -41,10 +46,17 @@ import static org.junit.Assert.fail; public class TestSparkSessionManagerImpl { + private static final Logger LOG = LoggerFactory.getLogger(TestSparkSessionManagerImpl.class); private SparkSessionManagerImpl sessionManagerHS2 = null; private boolean anyFailedSessionThread; // updated only when a thread has failed. + private static HiveConf SESSION_HIVE_CONF = new HiveConf(); + + @BeforeClass + public static void setup() { + SessionState.start(SESSION_HIVE_CONF); + } /** Tests CLI scenario where we get a single session and use it multiple times. */ @@ -82,7 +94,7 @@ public void testMultiSessionMultipleUse() throws Exception { List threadList = new ArrayList(); for (int i = 0; i < 10; i++) { - Thread t = new Thread(new SessionThread(), "Session thread " + i); + Thread t = new Thread(new SessionThread(SessionState.get()), "Session thread " + i); t.start(); threadList.add(t); } @@ -184,6 +196,23 @@ public void testGetHiveException() throws Exception { checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, "Other exception"); } + @Test + public void testGetSessionId() throws HiveException { + SessionState ss = SessionState.start(SESSION_HIVE_CONF); + SparkSessionManager ssm = SparkSessionManagerImpl.getInstance(); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("0", ss.getSparkSession().getSessionId()); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("1", ss.getSparkSession().getSessionId()); + + ss = SessionState.start(SESSION_HIVE_CONF); + + ss.setSparkSession(ssm.getSession(null, SESSION_HIVE_CONF, true)); + assertEquals("0", ss.getSparkSession().getSessionId()); + } + private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) { checkHiveException(ss, e, expectedErrMsg, null); } @@ -219,10 +248,16 @@ private void checkSparkConf(HiveConf conf, String paramName, String expectedValu /* Thread simulating a user session in HiveServer2. */ public class SessionThread implements Runnable { + private final SessionState ss; + + private SessionThread(SessionState ss) { + this.ss = ss; + } @Override public void run() { try { + SessionState.setCurrentSessionState(ss); Random random = new Random(Thread.currentThread().getId()); String threadName = Thread.currentThread().getName(); System.out.println(threadName + " started.");