diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index c97f595..2b2da67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -78,7 +78,7 @@ public void run() { try { for (TezSessionState s: TezSessionState.getOpenSessions()) { System.err.println("Shutting down tez session."); - s.close(false); + TezSessionPoolManager.getInstance().close(s); } } catch (Exception e) { // ignore diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 0308497..d4f6a6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -262,9 +262,17 @@ public TezSessionState getSession(TezSessionState session, HiveConf conf, } if (session != null) { - session.close(false); + close(session); } return getSession(conf, doOpen, forceCreate); } + + public void closeAndOpen(TezSessionState sessionState, HiveConf conf) + throws Exception { + HiveConf sessionConf = sessionState.getConf(); + conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); + close(sessionState); + sessionState.open(conf); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index b351d13..84cfca4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -28,8 +26,6 @@ import java.util.Map; import java.util.Set; -import javax.security.auth.login.LoginException; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -39,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; @@ -57,7 +52,6 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; @@ -297,8 +291,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr, TezSessionState sessionState) - throws IOException, TezException, InterruptedException, - LoginException, URISyntaxException, HiveException { + throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); DAGClient dagClient = null; @@ -310,11 +303,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around - sessionState.close(true); - - // (re)open the session - sessionState.open(this.conf); - + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 1793b58..1af92cf 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -29,14 +29,11 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import javax.security.auth.login.LoginException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -61,7 +58,6 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.SessionNotRunning; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.junit.After; @@ -202,8 +198,7 @@ public void testEmptyWork() throws IllegalArgumentException, IOException, Except } @Test - public void testSubmit() throws LoginException, IllegalArgumentException, - IOException, TezException, InterruptedException, URISyntaxException, HiveException { + public void testSubmit() throws Exception { DAG dag = new DAG("test"); task.submit(conf, dag, path, appLr, sessionState); // validate close/reopen