Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1619503) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -75,8 +75,17 @@ private boolean isWhiteListRestrictionEnabled = false; private final List modWhiteList = new ArrayList(); + private boolean isSparkConfigUpdated = false; + public boolean getSparkConfigUpdated() { + return isSparkConfigUpdated; + } + + public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + this.isSparkConfigUpdated = isSparkConfigUpdated; + } + static { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (classLoader == null) { @@ -1947,6 +1956,7 @@ throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list" + "of parameters that can't be modified at runtime"); } + isSparkConfigUpdated = name.startsWith("spark"); set(name, value); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (revision 1619503) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (working copy) @@ -33,6 +33,7 @@ private HiveConf conf; private boolean isOpen; private final String sessionId; + private SparkClient sparkClient; public SparkSessionImpl() { sessionId = makeSessionId(); @@ -47,8 +48,8 @@ @Override public int submit(DriverContext driverContext, SparkWork sparkWork) { Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); - return SparkClient.getInstance(driverContext.getCtx().getConf()) - .execute(driverContext, sparkWork); + sparkClient = SparkClient.getInstance(driverContext.getCtx().getConf()); + return sparkClient.execute(driverContext, sparkWork); } @Override @@ -69,6 +70,10 @@ @Override public void close() { isOpen = false; + if (sparkClient != null) { + sparkClient.close(); + } + sparkClient = null; } public static String makeSessionId() { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (revision 1619503) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (working copy) @@ -225,4 +225,9 @@ } } } + + public void close() { + sc.stop(); + client = null; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (revision 1619503) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (working copy) @@ -62,9 +62,15 @@ configureNumberOfReducers(); sparkSessionManager = SparkSessionManagerImpl.getInstance(); sparkSession = SessionState.get().getSparkSession(); + + // Spark configurations are updated close the existing session + if(conf.getSparkConfigUpdated()){ + sparkSessionManager.closeSession(sparkSession); + sparkSession = null; + conf.setSparkConfigUpdated(false); + } sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); SessionState.get().setSparkSession(sparkSession); - rc = sparkSession.submit(driverContext, getWork()); } catch (Exception e) { LOG.error("Failed to execute spark task.", e);