diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index c315dd2..7ec926c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -78,7 +78,7 @@ public void run() { try { for (TezSessionState s: TezSessionState.getOpenSessions()) { System.err.println("Shutting down tez session."); - TezSessionPoolManager.getInstance().close(s); + TezSessionPoolManager.getInstance().close(s, false); } } catch (Exception e) { // ignore diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 0d0ac41..c1187ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -168,10 +168,10 @@ public void returnSession(TezSessionState tezSessionState) // session in the SessionState } - public void close(TezSessionState tezSessionState) throws Exception { + public void close(TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { LOG.info("Closing tez session default? " + tezSessionState.isDefault()); if (!tezSessionState.isDefault()) { - tezSessionState.close(false); + tezSessionState.close(keepTmpDir); } } @@ -262,19 +262,19 @@ public TezSessionState getSession(TezSessionState session, HiveConf conf, } if (session != null) { - close(session); + close(session, false); } return getSession(conf, doOpen, forceCreate); } - public void closeAndOpen(TezSessionState sessionState, HiveConf conf) + public void closeAndOpen(TezSessionState sessionState, HiveConf conf, boolean keepTmpDir) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get("tez.queue.name") != null) { conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); } - close(sessionState); + close(sessionState, keepTmpDir); sessionState.open(conf); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 62de830..bd07605 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -310,7 +310,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around - TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, true); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag, resourceMap); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 9798cf3..f21956b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1062,7 +1062,7 @@ public void close() throws IOException { try { if (tezSessionState != null) { - TezSessionPoolManager.getInstance().close(tezSessionState); + TezSessionPoolManager.getInstance().close(tezSessionState, false); } } catch (Exception e) { LOG.info("Error closing tez session", e); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 0835bde..028c856 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -208,7 +208,7 @@ public void testSubmit() throws Exception { task.submit(conf, dag, path, appLr, sessionState, new LinkedList()); // validate close/reopen verify(sessionState, times(1)).open(any(HiveConf.class)); - verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043 + verify(sessionState, times(1)).close(eq(true)); // now uses pool after HIVE-7043 verify(session, times(2)).submitDAG(any(DAG.class), any(Map.class)); }