diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index 461f359..a20ff1c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -66,4 +66,9 @@ * Close session and release resources */ public void close(); + + /** + * Return the session user. + */ + public String getUser(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 12e0e28..8d86eb8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.spark.SparkConf; import org.apache.spark.util.Utils; @@ -43,6 +44,7 @@ private boolean isOpen; private final String sessionId; private HiveSparkClient hiveSparkClient; + private String user; public SparkSessionImpl() { sessionId = makeSessionId(); @@ -52,7 +54,11 @@ public SparkSessionImpl() { public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; + try { + UserGroupInformation ugi = org.apache.hadoop.hive.shims.Utils.getUGI(); + user = ugi.getShortUserName(); + LOG.info("User of session id " + sessionId + " is " + user); hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); } catch (Exception e) { throw new HiveException("Failed to create spark client.", e); @@ -110,6 +116,10 @@ public void close() { } hiveSparkClient = null; } + + public String getUser() { + return user; + } public static String makeSessionId() { return UUID.randomUUID().toString(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 2e80383..ecda052 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -102,7 +102,7 @@ public SparkSession getSession(SparkSession existingSession, HiveConf conf, setup(conf); if (existingSession != null) { - if (canReuseSession(existingSession, conf)) { + if (canReuseSession(existingSession)) { // Open the session if it is closed. if (!existingSession.isOpen() && doOpen) { existingSession.open(conf); @@ -137,17 +137,19 @@ public SparkSession getSession(SparkSession existingSession, HiveConf conf, /** * Currently we only match the userNames in existingSession conf and given conf. */ - private boolean canReuseSession(SparkSession existingSession, HiveConf conf) throws HiveException { + private boolean canReuseSession(SparkSession existingSession) throws HiveException { try { + boolean canReuseSession = true; UserGroupInformation newUgi = Utils.getUGI(); String newUserName = newUgi.getShortUserName(); - - // TODOD this we need to store the session username somewhere else as getUGIForConf never used the conf - UserGroupInformation ugiInSession = Utils.getUGI(); - String userNameInSession = ugiInSession.getShortUserName(); - - return newUserName.equals(userNameInSession); - } catch(Exception ex) { + LOG.info("The current user: " + newUserName + ", session user: " + existingSession.getUser()); + if (newUserName.equals(existingSession.getUser()) == false) { + LOG.info("Different users incoming: " + newUserName + " existing: " + + existingSession.getUser()); + canReuseSession = false; + } + return canReuseSession; + } catch (Exception ex) { throw new HiveException("Failed to get user info from HiveConf.", ex); } }