diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index db942b0..bc6ff6d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2659,7 +2659,9 @@ public void verifyAndSet(String name, String value) throws IllegalArgumentExcept // When either name or value is null, the set method below will fail, // and throw IllegalArgumentException set(name, value); - isSparkConfigUpdated = isSparkRelatedConfig(name); + if (isSparkRelatedConfig(name)) { + isSparkConfigUpdated = true; + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 0268469..987e650 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -121,14 +121,20 @@ public static boolean isDedicatedCluster(Configuration conf) { public static SparkSession getSparkSession(HiveConf conf, SparkSessionManager sparkSessionManager) throws HiveException { SparkSession sparkSession = SessionState.get().getSparkSession(); - - // Spark configurations are updated close the existing session - if (conf.getSparkConfigUpdated()) { - sparkSessionManager.closeSession(sparkSession); - sparkSession = null; - conf.setSparkConfigUpdated(false); + HiveConf sessionConf = SessionState.get().getConf(); + + try { + SessionState.get().getConfLock().lock(); + // Spark configurations are updated close the existing session + if (sessionConf.getSparkConfigUpdated()) { + sparkSessionManager.closeSession(sparkSession); + sparkSession = null; + sessionConf.setSparkConfigUpdated(false); + } + } finally { + SessionState.get().getConfLock().unlock(); } - sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); + sparkSession = sparkSessionManager.getSession(sparkSession, sessionConf, true); SessionState.get().setSparkSession(sparkSession); return sparkSession; } diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java index 9a3ba04..15b9289 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java @@ -206,7 +206,12 @@ private static String setConf(String varname, String key, String varvalue, boole throw new IllegalArgumentException("hive configuration " + key + " does not exists."); } } - conf.verifyAndSet(key, value); + try { + SessionState.get().getConfLock().lock(); + conf.verifyAndSet(key, value); + } finally { + SessionState.get().getConfLock().unlock(); + } if (HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname.equals(key)) { if (!"spark".equals(value)) { SessionState.get().closeSparkSession(); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 5c69fb6..94b58e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -120,6 +120,9 @@ */ private final HiveConf conf; + // Session-scope conf lock. + private final ReentrantLock confLock = new ReentrantLock(); + /** * silent mode. */ @@ -325,6 +328,10 @@ public void setIsSilent(boolean isSilent) { this.isSilent = isSilent; } + public ReentrantLock getConfLock() { + return confLock; + } + public ReentrantLock getCompileLock() { return compileLock; }