diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 2ba75e0..5d6a02c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -54,7 +54,7 @@ public static HiveSparkClient createHiveSparkClient(Configuration configuration) } } - private static Map initiateSparkConf(Configuration hiveConf) { + public static Map initiateSparkConf(Configuration hiveConf) { Map sparkConf = new HashMap(); // set default spark configurations. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index f46c1b4..4943322 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -76,7 +76,6 @@ private transient List localFiles = new ArrayList(); RemoteHiveSparkClient(Map conf) throws IOException, SparkException { - SparkClientFactory.initialize(conf); sparkConf = HiveSparkClientFactory.generateSparkConf(conf); remoteClient = SparkClientFactory.createClient(conf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index d77866a..b6abee5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -21,15 +21,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.spark.client.SparkClientFactory; +import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * Simple implementation of SparkSessionManager @@ -41,7 +45,7 @@ private static final Log LOG = LogFactory.getLog(SparkSessionManagerImpl.class); private Set createdSessions; - private boolean inited; + private AtomicBoolean inited = new AtomicBoolean(false); private static SparkSessionManagerImpl instance; @@ -74,13 +78,16 @@ private SparkSessionManagerImpl() { @Override public void setup(HiveConf hiveConf) throws HiveException { - LOG.info("Setting up the session manager."); - init(); - } - - private void init() { - createdSessions = Collections.synchronizedSet(new HashSet()); - inited = true; + if (inited.compareAndSet(false, true)) { + LOG.info("Setting up the session manager."); + createdSessions = Collections.synchronizedSet(new HashSet()); + Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf); + try { + SparkClientFactory.initialize(conf); + } catch (IOException e) { + throw new HiveException("Error initializing SparkClientFactory", e); + } + } } /** @@ -92,9 +99,7 @@ private void init() { @Override public SparkSession getSession(SparkSession existingSession, HiveConf conf, boolean doOpen) throws HiveException { - if (!inited) { - init(); - } + setup(conf); if (existingSession != null) { if (canReuseSession(existingSession, conf)) { @@ -178,6 +183,7 @@ public void shutdown() { createdSessions.clear(); } } - inited = false; + inited.set(false); + SparkClientFactory.stop(); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index baaf570..6d58042 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -44,15 +44,17 @@ * @param conf Map containing configuration parameters for the client. */ public static synchronized void initialize(Map conf) throws IOException { - secret = akka.util.Crypt.generateSecureCookie(); + if (!initialized) { + secret = akka.util.Crypt.generateSecureCookie(); - Map akkaConf = Maps.newHashMap(conf); - akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret); + Map akkaConf = Maps.newHashMap(conf); + akkaConf.put(ClientUtils.CONF_KEY_SECRET, secret); - ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(akkaConf); - actorSystem = info.system; - akkaUrl = info.url; - initialized = true; + ClientUtils.ActorSystemInfo info = ClientUtils.createActorSystem(akkaConf); + actorSystem = info.system; + akkaUrl = info.url; + initialized = true; + } } /** Stops the SparkClient library. */