Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 992148) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -281,39 +281,31 @@ * @return true if fatal errors happened during job execution, false * otherwise. */ - private boolean checkFatalErrors(ExecDriverTaskHandle th, StringBuilder errMsg) { - RunningJob rj = th.getRunningJob(); - try { - Counters ctrs = th.getCounters(); - if (ctrs == null) { - // hadoop might return null if it cannot locate the job. - // we may still be able to retrieve the job status - so ignore - return false; + private boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { + if (ctrs == null) { + // hadoop might return null if it cannot locate the job. + // we may still be able to retrieve the job status - so ignore + return false; + } + // check for number of created files + long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES); + long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES); + if (numFiles > upperLimit) { + errMsg.append("total number of created files exceeds ").append(upperLimit); + return true; + } + + for (Operator op : work.getAliasToWork().values()) { + if (op.checkFatalErrors(ctrs, errMsg)) { + return true; } - // check for number of created files - long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES); - long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES); - if (numFiles > upperLimit) { - errMsg.append("total number of created files exceeds ").append(upperLimit); + } + if (work.getReducer() != null) { + if (work.getReducer().checkFatalErrors(ctrs, errMsg)) { return true; } - - for (Operator op : work.getAliasToWork().values()) { - if (op.checkFatalErrors(ctrs, errMsg)) { - return true; - } - } - if (work.getReducer() != null) { - if (work.getReducer().checkFatalErrors(ctrs, errMsg)) { - return true; - } - } - return false; - } catch (IOException e) { - // this exception can be tolerated - e.printStackTrace(); - return false; } + return false; } private boolean progress(ExecDriverTaskHandle th) throws IOException { @@ -354,6 +346,7 @@ throw new IOException("Could not find status of job: + rj.getJobID()"); } else { th.setRunningJob(newRj); + rj = newRj; } // If fatal errors happen we should kill the job immediately rather than @@ -361,7 +354,10 @@ if (fatal) { continue; // wait until rj.isComplete } - if (fatal = checkFatalErrors(th, errMsg)) { + + Counters ctrs = th.getCounters(); + + if (fatal = checkFatalErrors(ctrs, errMsg)) { console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job."); rj.killJob(); @@ -369,7 +365,7 @@ } errMsg.setLength(0); - updateCounters(th); + updateCounters(ctrs, rj); String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress + "%"; @@ -384,7 +380,7 @@ SessionState ss = SessionState.get(); if (ss != null) { ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), - getId(), rj); + getId(), ctrs); ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), Keys.TASK_HADOOP_PROGRESS, output); ss.getHiveHistory().progressTask(SessionState.get().getQueryId(), @@ -398,12 +394,14 @@ } boolean success; + Counters ctrs = th.getCounters(); + if (fatal) { success = false; } else { // check for fatal error again in case it occurred after // the last check before the job is completed - if (checkFatalErrors(th, errMsg)) { + if (checkFatalErrors(ctrs, errMsg)) { console.printError("[Fatal Error] " + errMsg.toString()); success = false; } else { @@ -413,7 +411,7 @@ setDone(); // update based on the final value of the counters - updateCounters(th); + updateCounters(ctrs, rj); SessionState ss = SessionState.get(); if (ss != null) { @@ -426,15 +424,13 @@ /** * Update counters relevant to this task. */ - private void updateCounters(ExecDriverTaskHandle th) throws IOException { - RunningJob rj = th.getRunningJob(); + private void updateCounters(Counters ctrs, RunningJob rj) throws IOException { mapProgress = Math.round(rj.mapProgress() * 100); reduceProgress = Math.round(rj.reduceProgress() * 100); taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long .valueOf(mapProgress)); taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long .valueOf(reduceProgress)); - Counters ctrs = th.getCounters(); if (ctrs == null) { // hadoop might return null if it cannot locate the job. // we may still be able to retrieve the job status - so ignore Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 992148) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.Counters.Group; +import org.apache.hadoop.mapred.Counters; /** * HiveHistory. @@ -352,19 +353,19 @@ * @param taskId * @param rj */ - public void setTaskCounters(String queryId, String taskId, RunningJob rj) { + public void setTaskCounters(String queryId, String taskId, Counters ctrs) { String id = queryId + ":" + taskId; QueryInfo ji = queryInfoMap.get(queryId); StringBuilder sb1 = new StringBuilder(""); TaskInfo ti = taskInfoMap.get(id); - if (ti == null) { + if ((ti == null) || (ctrs == null)) { return; } StringBuilder sb = new StringBuilder(""); try { boolean first = true; - for (Group group : rj.getCounters()) { + for (Group group : ctrs) { for (Counter counter : group) { if (first) { first = false; @@ -391,7 +392,7 @@ } } catch (Exception e) { - e.printStackTrace(); + LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); } if (sb1.length() > 0) { taskInfoMap.get(id).hm.put(Keys.ROWS_INSERTED.name(), sb1.toString());