diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index c315dd2..7ec926c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -78,7 +78,7 @@ public void run() { try { for (TezSessionState s: TezSessionState.getOpenSessions()) { System.err.println("Shutting down tez session."); - TezSessionPoolManager.getInstance().close(s); + TezSessionPoolManager.getInstance().close(s, false); } } catch (Exception e) { // ignore diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 0d0ac41..5d647b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -168,10 +169,10 @@ public void returnSession(TezSessionState tezSessionState) // session in the SessionState } - public void close(TezSessionState tezSessionState) throws Exception { + public void close(TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { LOG.info("Closing tez session default? " + tezSessionState.isDefault()); if (!tezSessionState.isDefault()) { - tezSessionState.close(false); + tezSessionState.close(keepTmpDir); } } @@ -262,19 +263,19 @@ public TezSessionState getSession(TezSessionState session, HiveConf conf, } if (session != null) { - close(session); + close(session, false); } return getSession(conf, doOpen, forceCreate); } - public void closeAndOpen(TezSessionState sessionState, HiveConf conf) + public void closeAndOpen(TezSessionState sessionState, HiveConf conf, boolean keepTmpDir) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get("tez.queue.name") != null) { conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); } - close(sessionState); - sessionState.open(conf); + close(sessionState, keepTmpDir); + sessionState.open(conf, null); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 951e918..e01453a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -310,7 +310,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around - TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, true); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag, resourceMap); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 9798cf3..f21956b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1062,7 +1062,7 @@ public void close() throws IOException { try { if (tezSessionState != null) { - TezSessionPoolManager.getInstance().close(tezSessionState); + TezSessionPoolManager.getInstance().close(tezSessionState, false); } } catch (Exception e) { LOG.info("Error closing tez session", e);