diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index ec71c72..48c6b6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -78,7 +78,7 @@ public void run() { try { for (TezSessionState s: TezSessionState.getOpenSessions()) { System.err.println("Shutting down tez session."); - TezSessionPoolManager.getInstance().close(s); + TezSessionPoolManager.getInstance().close(s, false); } } catch (Exception e) { // ignore diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 7487253..a4fd36d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -168,10 +168,10 @@ public void returnSession(TezSessionState tezSessionState) // session in the SessionState } - public void close(TezSessionState tezSessionState) throws Exception { + public void close(TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { LOG.info("Closing tez session default? " + tezSessionState.isDefault()); if (!tezSessionState.isDefault()) { - tezSessionState.close(false); + tezSessionState.close(keepTmpDir); } } @@ -262,24 +262,24 @@ public TezSessionState getSession(TezSessionState session, HiveConf conf, } if (session != null) { - close(session); + close(session, false); } return getSession(conf, doOpen, forceCreate); } - public void closeAndOpen(TezSessionState sessionState, HiveConf conf) + public void closeAndOpen(TezSessionState sessionState, HiveConf conf, boolean keepTmpDir) throws Exception { - closeAndOpen(sessionState, conf, null); + closeAndOpen(sessionState, conf, null, keepTmpDir); } public void closeAndOpen(TezSessionState sessionState, HiveConf conf, - String[] additionalFiles) throws Exception { + String[] additionalFiles, boolean keepTmpDir) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get("tez.queue.name") != null) { conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); } - close(sessionState); + close(sessionState, keepTmpDir); sessionState.open(conf, additionalFiles); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 232ff2b..93e0fac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -370,7 +370,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around - TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars); + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, true); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index eca4126..af633cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1245,7 +1245,7 @@ public void close() throws IOException { try { if (tezSessionState != null) { - TezSessionPoolManager.getInstance().close(tezSessionState); + TezSessionPoolManager.getInstance().close(tezSessionState, false); } } catch (Exception e) { LOG.info("Error closing tez session", e); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index c522687..c6ac557 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -165,7 +165,7 @@ public void testCloseAndOpenDefault() throws Exception { TezSessionState session = Mockito.mock(TezSessionState.class); Mockito.when(session.isDefault()).thenReturn(false); - poolManager.closeAndOpen(session, conf); + poolManager.closeAndOpen(session, conf, false); Mockito.verify(session).close(false); Mockito.verify(session).open(conf, null); @@ -178,7 +178,7 @@ public void testCloseAndOpenWithResources() throws Exception { Mockito.when(session.isDefault()).thenReturn(false); String[] extraResources = new String[] { "file:///tmp/foo.jar" }; - poolManager.closeAndOpen(session, conf, extraResources); + poolManager.closeAndOpen(session, conf, extraResources, false); Mockito.verify(session).close(false); Mockito.verify(session).open(conf, extraResources); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 08c9e16..d004a27 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -214,7 +214,7 @@ public void testSubmit() throws Exception { new String[0], Collections. emptyMap()); // validate close/reopen verify(sessionState, times(1)).open(any(HiveConf.class), any(String[].class)); - verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043 + verify(sessionState, times(1)).close(eq(true)); // now uses pool after HIVE-7043 verify(session, times(2)).submitDAG(any(DAG.class)); }