diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 2a55527..e584e6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -187,6 +187,7 @@ public void setChildTasks(List> childTasks) { this.childTasks = childTasks; } + @Override public List getChildren() { return getChildTasks(); } @@ -521,7 +522,7 @@ Throwable getException() { return exception; } - void setException(Throwable ex) { + protected void setException(Throwable ex) { exception = ex; } @@ -542,10 +543,12 @@ public String toString() { return getId() + ":" + getType(); } + @Override public int hashCode() { return toString().hashCode(); } + @Override public boolean equals(Object obj) { return toString().equals(String.valueOf(obj)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index fea7eb4..f1a7f46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -98,7 +98,7 @@ // in-place progress update related variables private int lines; - private PrintStream out; + private final PrintStream out; private String separator; private transient LogHelper console; @@ -115,6 +115,8 @@ private final NumberFormat commaFormat; private static final List shutdownList; + private StringBuffer diagnostics; + static { shutdownList = Collections.synchronizedList(new LinkedList()); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -251,6 +253,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Hi DAG dag) throws InterruptedException { DAGStatus status = null; completed = new HashSet(); + diagnostics = new StringBuffer(); boolean running = false; boolean done = false; @@ -396,6 +399,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Hi if (rc != 0 && status != null) { for (String diag : status.getDiagnostics()) { console.printError(diag); + diagnostics.append(diag); } } shutdownList.remove(dagClient); @@ -800,11 +804,11 @@ private String getReport(Map progressMap) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } if(complete < total && (complete > 0 || running > 0 || failed > 0)) { - + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } - + /* vertex is started, but not complete */ if (failed > 0) { reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); @@ -825,4 +829,8 @@ private String getReport(Map progressMap) { return reportBuffer.toString(); } + + public String getDiagnostics() { + return diagnostics.toString(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index fc83a86..d79d38a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -27,15 +27,16 @@ import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; @@ -48,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -59,7 +59,6 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; @@ -165,6 +164,9 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag); + if (rc != 0) { + this.setException(new HiveException(monitor.getDiagnostics())); + } // fetch the counters Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);