diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index db942b0..13b3982 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -81,18 +81,12 @@ private static final Map metaConfs = new HashMap(); private final List restrictList = new ArrayList(); private final Set hiddenSet = new HashSet(); + private transient final Map updatedSparkConf = new HashMap(); private Pattern modWhiteListPattern = null; - private volatile boolean isSparkConfigUpdated = false; private static final int LOG_PREFIX_LENGTH = 64; - public boolean getSparkConfigUpdated() { - return isSparkConfigUpdated; - } - public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { - this.isSparkConfigUpdated = isSparkConfigUpdated; - } static { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); @@ -2659,7 +2653,11 @@ public void verifyAndSet(String name, String value) throws IllegalArgumentExcept // When either name or value is null, the set method below will fail, // and throw IllegalArgumentException set(name, value); - isSparkConfigUpdated = isSparkRelatedConfig(name); + if (isSparkRelatedConfig(name)) { + synchronized (updatedSparkConf) { + updatedSparkConf.put(name, value); + } + } } } @@ -2922,7 +2920,6 @@ public HiveConf(HiveConf other) { super(other); hiveJar = other.hiveJar; auxJars = other.auxJars; - isSparkConfigUpdated = other.isSparkConfigUpdated; origProp = (Properties)other.origProp.clone(); restrictList.addAll(other.restrictList); hiddenSet.addAll(other.hiddenSet); @@ -3415,4 +3412,17 @@ public static String generateMrDeprecationWarning() { + "Consider using a different execution engine (i.e. " + HiveConf.getNonMrEngines() + ") or using Hive 1.X releases."; } + + /** + * + * @return + */ + public Map pullUpdatedSparkConf() { + Map sparkConf = new HashMap(); + synchronized (updatedSparkConf){ + sparkConf.putAll(updatedSparkConf); + updatedSparkConf.clear(); + } + return sparkConf; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 0268469..3d5c22c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -22,6 +22,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; +import java.util.Map; import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; @@ -118,15 +119,18 @@ public static boolean isDedicatedCluster(Configuration conf) { return master.startsWith("yarn-") || master.startsWith("local"); } - public static SparkSession getSparkSession(HiveConf conf, + public static synchronized SparkSession getSparkSession(HiveConf conf, SparkSessionManager sparkSessionManager) throws HiveException { SparkSession sparkSession = SessionState.get().getSparkSession(); + Map updatedSparkConf = SessionState.get().getConf().pullUpdatedSparkConf(); // Spark configurations are updated close the existing session - if (conf.getSparkConfigUpdated()) { + if (!updatedSparkConf.isEmpty()) { sparkSessionManager.closeSession(sparkSession); sparkSession = null; - conf.setSparkConfigUpdated(false); + for (Map.Entry sparkConf : updatedSparkConf.entrySet()) { + conf.set(sparkConf.getKey(), sparkConf.getValue()); + } } sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); SessionState.get().setSparkSession(sparkSession);