diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index aef8f0a..9c727de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -69,7 +69,7 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { boolean done = false; int checkInterval = 500; int printInterval = 3000; - int maxRetryInterval = 5000; + int maxRetryInterval = 2500; int counter = 0; int failedCounter = 0; int rc = 0; @@ -86,7 +86,6 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { try { status = dagClient.getDAGStatus(); Map progressMap = status.getVertexProgress(); - failedCounter = 0; DAGStatus.State state = status.getState(); if (state != lastState || state == RUNNING) { @@ -132,9 +131,15 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { break; } } + if (!done) { + Thread.sleep(checkInterval); + } } catch (Exception e) { - if (failedCounter % maxRetryInterval/checkInterval == 0) { + console.printInfo("Exception: "+e.getMessage()); + if (++failedCounter % maxRetryInterval/checkInterval == 0 + || e instanceof InterruptedException) { try { + console.printInfo("Killing DAG..."); dagClient.tryKillDAG(); } catch(IOException io) { // best effort @@ -145,18 +150,19 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { console.printError("Execution has failed."); rc = 1; done = true; + } else { + console.printInfo("Retrying..."); } - } - - if (done) { - if (rc != 0 && status != null) { - for (String diag: status.getDiagnostics()) { - console.printError(diag); + } finally { + if (done) { + if (rc != 0 && status != null) { + for (String diag: status.getDiagnostics()) { + console.printError(diag); + } } + break; } - break; } - Thread.sleep(500); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); return rc; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 716eacb..fe13dc1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -40,6 +40,7 @@ import org.apache.tez.client.AMConfiguration; import org.apache.tez.client.TezSession; import org.apache.tez.client.TezSessionConfiguration; +import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.mapreduce.hadoop.MRHelpers; @@ -131,15 +132,22 @@ public void open(String sessionId, HiveConf conf) * @throws IOException * @throws TezException */ - public void close() throws TezException, IOException { + public void close(boolean keepTmpDir) throws TezException, IOException { if (!isOpen()) { return; } LOG.info("Closing Tez Session"); - session.stop(); - FileSystem fs = tezScratchDir.getFileSystem(conf); - fs.delete(tezScratchDir, true); + try { + session.stop(); + } catch (SessionNotRunning nr) { + // ignore + } + + if (!keepTmpDir) { + FileSystem fs = tezScratchDir.getFileSystem(conf); + fs.delete(tezScratchDir, true); + } session = null; tezScratchDir = null; conf = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index c82d794..4eda92c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -19,11 +19,14 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.net.URISyntaxException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.security.auth.login.LoginException; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -34,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; @@ -45,6 +49,7 @@ import org.apache.tez.client.TezSession; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; @@ -194,12 +199,34 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr, TezSession session) - throws IOException, TezException, InterruptedException { + throws IOException, TezException, InterruptedException, + LoginException, URISyntaxException, HiveException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); + DAGClient dagClient = null; + + try { + // ready to start execution on the cluster + dagClient = session.submitDAG(dag); + } catch (SessionNotRunning nr) { + console.printInfo("Tez session was closed. Reopening..."); - // ready to start execution on the cluster - DAGClient dagClient = session.submitDAG(dag); + // Need to remove this static hack. But this is the way currently to + // get a session. + SessionState ss = SessionState.get(); + TezSessionState tezSession = ss.getTezSession(); + + // close the old one, but keep the tmp files around + tezSession.close(true); + + // (re)open the session + tezSession.open(ss.getSessionId(), this.conf); + session = tezSession.getSession(); + + console.printInfo("Session re-established."); + + dagClient = session.submitDAG(dag); + } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); return dagClient; diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index a9725c9..40b2f53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -808,7 +808,7 @@ public void close() throws IOException { } try { - tezSessionState.close(); + tezSessionState.close(false); } catch (Exception e) { LOG.info("Error closing tez session", e); }