diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index db942b0..887a323 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -81,18 +81,12 @@ private static final Map metaConfs = new HashMap(); private final List restrictList = new ArrayList(); private final Set hiddenSet = new HashSet(); + private transient final Map updatedSparkConf = new HashMap(); private Pattern modWhiteListPattern = null; - private volatile boolean isSparkConfigUpdated = false; private static final int LOG_PREFIX_LENGTH = 64; - public boolean getSparkConfigUpdated() { - return isSparkConfigUpdated; - } - public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { - this.isSparkConfigUpdated = isSparkConfigUpdated; - } static { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); @@ -2659,7 +2653,11 @@ public void verifyAndSet(String name, String value) throws IllegalArgumentExcept // When either name or value is null, the set method below will fail, // and throw IllegalArgumentException set(name, value); - isSparkConfigUpdated = isSparkRelatedConfig(name); + if (isSparkRelatedConfig(name)) { + synchronized (updatedSparkConf) { + updatedSparkConf.put(name, value); + } + } } } @@ -2922,7 +2920,6 @@ public HiveConf(HiveConf other) { super(other); hiveJar = other.hiveJar; auxJars = other.auxJars; - isSparkConfigUpdated = other.isSparkConfigUpdated; origProp = (Properties)other.origProp.clone(); restrictList.addAll(other.restrictList); hiddenSet.addAll(other.hiddenSet); @@ -3415,4 +3412,17 @@ public static String generateMrDeprecationWarning() { + "Consider using a different execution engine (i.e. " + HiveConf.getNonMrEngines() + ") or using Hive 1.X releases."; } + + /** + * fetch newly updated spark related conf. this method is not idempotent + * @return + */ + public Map pullUpdatedSparkConf() { + Map sparkConf = new HashMap(); + synchronized (updatedSparkConf){ + sparkConf.putAll(updatedSparkConf); + updatedSparkConf.clear(); + } + return sparkConf; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 0268469..9430225 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -22,6 +22,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.Map; import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; @@ -120,17 +121,24 @@ public static boolean isDedicatedCluster(Configuration conf) { public static SparkSession getSparkSession(HiveConf conf, SparkSessionManager sparkSessionManager) throws HiveException { - SparkSession sparkSession = SessionState.get().getSparkSession(); - - // Spark configurations are updated close the existing session - if (conf.getSparkConfigUpdated()) { - sparkSessionManager.closeSession(sparkSession); - sparkSession = null; - conf.setSparkConfigUpdated(false); + try { + SessionState.get().getSparkSessionLock().lock(); + SparkSession sparkSession = SessionState.get().getSparkSession(); + Map updatedSparkConf = SessionState.get().getConf().pullUpdatedSparkConf(); + // Spark configurations are updated close the existing session + if (!updatedSparkConf.isEmpty()) { + sparkSessionManager.closeSession(sparkSession); + sparkSession = null; + for (Map.Entry sparkConf : updatedSparkConf.entrySet()) { + conf.set(sparkConf.getKey(), sparkConf.getValue()); + } + } + sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); + SessionState.get().setSparkSession(sparkSession); + return sparkSession; + } finally { + SessionState.get().getSparkSessionLock().unlock(); } - sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); - SessionState.get().setSparkSession(sparkSession); - return sparkSession; } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 5c69fb6..3046f58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -115,6 +115,9 @@ // Session-scope compile lock. private final ReentrantLock compileLock = new ReentrantLock(); + // Session-scope spark session lock. + private final ReentrantLock sparkSessionLock = new ReentrantLock(); + /** * current configuration. */ @@ -329,6 +332,10 @@ public ReentrantLock getCompileLock() { return compileLock; } + public ReentrantLock getSparkSessionLock() { + return sparkSessionLock; + } + public boolean getIsVerbose() { return isVerbose; }