Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1615859) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -72,8 +72,17 @@ private boolean isWhiteListRestrictionEnabled = false; private final List modWhiteList = new ArrayList(); + private boolean isSparkConfigUpdated = false; + public boolean getSparkConfigUpdated() { + return isSparkConfigUpdated; + } + + public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + this.isSparkConfigUpdated = isSparkConfigUpdated; + } + static { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (classLoader == null) { @@ -1917,6 +1926,7 @@ throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list" + "of parameters that can't be modified at runtime"); } + isSparkConfigUpdated = name.startsWith("spark"); set(name, value); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (revision 1615859) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (working copy) @@ -49,7 +49,7 @@ private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; private static SparkClient client; - + public static synchronized SparkClient getInstance(Configuration hiveConf) { if (client == null) { client = new SparkClient(hiveConf); @@ -220,4 +220,9 @@ } } } + + public void close(){ + sc.stop(); + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (revision 1615859) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (working copy) @@ -20,6 +20,9 @@ import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; @@ -27,10 +30,12 @@ import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; public class SparkTask extends Task { private static final long serialVersionUID = 1L; + protected static transient final Log LOG = LogFactory.getLog(SparkTask.class); @Override public int execute(DriverContext driverContext) { @@ -37,11 +42,31 @@ int rc = 1; SparkClient client = null; try { - client = SparkClient.getInstance(driverContext.getCtx().getConf()); + // Get the session + SessionState sessionState = SessionState.get(); + + // Get the configurations from the session + HiveConf conf = sessionState.getConf(); + + // Get the spark client from the session + client = sessionState.getSparkClient(); + if (conf.getSparkConfigUpdated() && client != null) { + client.close(); + client = null; + } + if (client == null) { + client = SparkClient.getInstance(conf); + sessionState.setSparkClient(client); + conf.setSparkConfigUpdated(false); + } rc = client.execute(driverContext, getWork()); + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Failed to execute spark job.", e); + // rc will be 1 at this point indicating failure. } finally { if (client != null) { - rc = close(rc); + rc = close(rc); } } return rc; Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 1615859) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkClient; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.history.HiveHistory; @@ -201,6 +202,9 @@ * @return Path for local scratch directory for current session */ private Path localSessionPath; + + + private SparkClient sparkClient; /** * Get the lineage state stored in this session. @@ -1062,6 +1066,9 @@ } finally { tezSessionState = null; } + + sparkClient.close(); + sparkClient = null; dropSessionPaths(conf); } @@ -1153,4 +1160,12 @@ this.userIpAddress = userIpAddress; } + public SparkClient getSparkClient() { + return sparkClient; + } + + public void setSparkClient(SparkClient sparkClient) { + this.sparkClient = sparkClient; + } + }