diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index b1e9235..c5539ff 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -213,6 +213,21 @@ public void stop() throws Exception { } } + /** + * This is called only in extreme cases where even our retry of submit fails. This method would + * close even default sessions and remove it from the queue. + * + * @param tezSessionState + * the session to be closed + * @throws Exception + */ + public void destroySession(TezSessionState tezSessionState) throws Exception { + LOG.warn("We are closing a " + (tezSessionState.isDefault() ? "default" : "non-default") + + " session because of retry failure."); + tezSessionState.close(false); + openSessions.remove(tezSessionState); + } + protected TezSessionState createSession(String sessionId) { return new TezSessionState(sessionId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 2cf990c..032a9e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -422,17 +423,33 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, } try { - // ready to start execution on the cluster - sessionState.getSession().addAppMasterLocalFiles(resourceMap); - dagClient = sessionState.getSession().submitDAG(dag); - } catch (SessionNotRunning nr) { - console.printInfo("Tez session was closed. Reopening..."); - - // close the old one, but keep the tmp files around - TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, true); - console.printInfo("Session re-established."); - - dagClient = sessionState.getSession().submitDAG(dag); + try { + // ready to start execution on the cluster + sessionState.getSession().addAppMasterLocalFiles(resourceMap); + dagClient = sessionState.getSession().submitDAG(dag); + } catch (SessionNotRunning nr) { + console.printInfo("Tez session was closed. Reopening..."); + + // close the old one, but keep the tmp files around + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, + true); + console.printInfo("Session re-established."); + + dagClient = sessionState.getSession().submitDAG(dag); + } + } catch (Exception e) { + // In case of any other exception, retry. If this also fails, report original error and exit. + try { + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, + true); + console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: " + + Arrays.toString(e.getStackTrace()) + " retrying..."); + dagClient = sessionState.getSession().submitDAG(dag); + } catch (Exception retryException) { + // we failed to submit after retrying. Destroy session and bail. + TezSessionPoolManager.getInstance().destroySession(sessionState); + throw retryException; + } } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index c148aae..3354219 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -20,17 +20,22 @@ import static org.junit.Assert.*; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; public class TestTezSessionPool { + private static final Log LOG = LogFactory.getLog(TestTezSessionPoolManager.class); HiveConf conf; Random random; private TezSessionPoolManager poolManager; @@ -214,6 +219,15 @@ public void testCloseAndOpenDefault() throws Exception { } @Test + public void testSessionDestroy() throws Exception { + poolManager = new TestTezSessionPoolManager(); + TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.isDefault()).thenReturn(false); + + poolManager.destroySession(session); + } + + @Test public void testCloseAndOpenWithResources() throws Exception { poolManager = new TestTezSessionPoolManager(); TezSessionState session = Mockito.mock(TezSessionState.class);