diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 1cabca6..9211d6c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -21,7 +21,10 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -54,7 +57,32 @@ private final int printInterval = 3000; private long lastPrintTime; private Set completed; - + private static final List shutdownList; + + static { + shutdownList = Collections.synchronizedList(new LinkedList()); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + for (DAGClient c: shutdownList) { + try { + System.err.println("Trying to shutdown DAG"); + c.tryKillDAG(); + } catch (Exception e) { + // ignore + } + } + try { + for (TezSessionState s: TezSessionState.getOpenSessions()) { + System.err.println("Shutting down tez session."); + s.close(false); + } + } catch (Exception e) { + // ignore + } + } + }); + } public TezJobMonitor() { console = new LogHelper(LOG); @@ -67,7 +95,7 @@ public TezJobMonitor() { * @param dagClient client that was used to kick off the job * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(DAGClient dagClient) throws InterruptedException { + public int monitorExecution(final DAGClient dagClient) throws InterruptedException { DAGStatus status = null; completed = new HashSet(); @@ -79,6 +107,8 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { String lastReport = null; Set opts = new HashSet(); + shutdownList.add(dagClient); + console.printInfo("\n"); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); @@ -163,6 +193,7 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { console.printError(diag); } } + shutdownList.remove(dagClient); break; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 09b390c..165aa49 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -20,7 +20,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import javax.security.auth.login.LoginException; @@ -56,6 +59,9 @@ private TezSession session; private String sessionId; + private static List openSessions + = Collections.synchronizedList(new LinkedList()); + /** * Constructor. We do not automatically connect, because we only want to * load tez classes when the user has tez installed. @@ -71,6 +77,14 @@ public boolean isOpen() { } /** + * Get all open sessions. Only used to clean up at shutdown. + * @return List + */ + public static List getOpenSessions() { + return openSessions; + } + + /** * Creates a tez session. A session is tied to either a cli/hs2 session. You can * submit multiple DAGs against a session (as long as they are executed serially). * @throws IOException @@ -116,6 +130,8 @@ public void open(String sessionId, HiveConf conf) // id is used for tez to reuse the current session rather than start a new one. conf.set("mapreduce.framework.name", "yarn-tez"); conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString()); + + openSessions.add(this); } /** @@ -132,6 +148,7 @@ public void close(boolean keepTmpDir) throws TezException, IOException { LOG.info("Closing Tez Session"); try { session.stop(); + openSessions.remove(this); } catch (SessionNotRunning nr) { // ignore }