Index: ql/src/test/results/clientnegative/fatal.q.out =================================================================== --- ql/src/test/results/clientnegative/fatal.q.out (revision 0) +++ ql/src/test/results/clientnegative/fatal.q.out (revision 0) @@ -0,0 +1,5 @@ +PREHOOK: query: select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/nzhang/hive_2010-08-02_13-41-52_752_1156521578782717030/-mr-10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/results/clientpositive/fatal.q.out =================================================================== --- ql/src/test/results/clientpositive/fatal.q.out (revision 981661) +++ ql/src/test/results/clientpositive/fatal.q.out (working copy) @@ -1,8 +0,0 @@ -PREHOOK: query: select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key) -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: file:/data/users/nzhang/work/876/apache-hive/build/ql/tmp/1224835023/10000 -POSTHOOK: query: select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key) -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: file:/data/users/nzhang/work/876/apache-hive/build/ql/tmp/1224835023/10000 Index: ql/src/test/queries/clientnegative/fatal.q =================================================================== --- ql/src/test/queries/clientnegative/fatal.q (revision 0) +++ ql/src/test/queries/clientnegative/fatal.q (revision 0) @@ -0,0 +1,4 @@ +set hive.mapjoin.maxsize=1; +set hive.task.progress=true; + +select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key); Index: ql/src/test/queries/clientpositive/fatal.q =================================================================== --- ql/src/test/queries/clientpositive/fatal.q (revision 981661) +++ ql/src/test/queries/clientpositive/fatal.q (working copy) @@ -1,4 +0,0 @@ -set hive.mapjoin.maxsize=1; -set hive.task.progress=true; - -select /*+ mapjoin(b) */ * from src a join src b on (a.key=b.key); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (revision 981661) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (working copy) @@ -348,30 +348,30 @@ * @return String null if job is eligible for local mode, reason otherwise */ public static String isEligibleForLocalMode(HiveConf conf, - ContentSummary inputSummary, - int numReducers) { + ContentSummary inputSummary, + int numReducers) { long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES); long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS); // check for max input size if (inputSummary.getLength() > maxBytes) - return "Input Size (= " + maxBytes + ") is larger than " + - HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")"; + return "Input Size (= " + inputSummary.getLength() + ") is larger than " + + HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")"; // ideally we would like to do this check based on the number of splits // in the absence of an easy way to get the number of splits - do this // based on the total number of files (pessimistically assumming that // splits are equal to number of files in worst case) if (inputSummary.getFileCount() > maxTasks) - return "Number of Input Files (= " + inputSummary.getFileCount() + - ") is larger than " + - HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")"; + return "Number of Input Files (= " + inputSummary.getFileCount() + + ") is larger than " + + HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")"; // since local mode only runs with 1 reducers - make sure that the // the number of reducers (set by user or inferred) is <=1 if (numReducers > 1) - return "Number of reducers (= " + numReducers + ") is more than 1"; + return "Number of reducers (= " + numReducers + ") is more than 1"; return null; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 981661) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -97,7 +97,6 @@ protected transient JobConf job; protected transient int mapProgress = 0; protected transient int reduceProgress = 0; - protected transient boolean success = false; // if job execution is successful /** * Constructor when invoked from QL. @@ -289,10 +288,12 @@ RunningJob rj = th.getRunningJob(); try { Counters ctrs = th.getCounters(); - // HIVE-1422 if (ctrs == null) { + // hadoop might return null if it cannot locate the job. + // we may still be able to retrieve the job status - so ignore return false; } + for (Operator op : work.getAliasToWork().values()) { if (op.checkFatalErrors(ctrs, errMsg)) { return true; @@ -311,7 +312,7 @@ } } - private void progress(ExecDriverTaskHandle th) throws IOException { + private boolean progress(ExecDriverTaskHandle th) throws IOException { JobClient jc = th.getJobClient(); RunningJob rj = th.getRunningJob(); String lastReport = ""; @@ -339,15 +340,24 @@ // rj.getJobState() again and we do not want to do an extra RPC call initializing = false; } - th.setRunningJob(jc.getJob(rj.getJobID())); + RunningJob newRj = jc.getJob(rj.getJobID()); + if (newRj == null) { + // under exceptional load, hadoop may not be able to look up status + // of finished jobs (because it has purged them from memory). From + // hive's perspective - it's equivalent to the job having failed. + // So raise a meaningful exception + throw new IOException("Could not find status of job: + rj.getJobID()"); + } else { + th.setRunningJob(newRj); + } + // If fatal errors happen we should kill the job immediately rather than // let the job retry several times, which eventually lead to failure. if (fatal) { continue; // wait until rj.isComplete } if (fatal = checkFatalErrors(th, errMsg)) { - success = false; console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job."); rj.killJob(); @@ -382,23 +392,31 @@ reportTime = System.currentTimeMillis(); } } - // check for fatal error again in case it occurred after the last check - // before the job is completed - if (!fatal && (fatal = checkFatalErrors(th, errMsg))) { - console.printError("[Fatal Error] " + errMsg.toString()); + + boolean success; + if (fatal) { success = false; } else { - success = rj.isSuccessful(); + // check for fatal error again in case it occurred after + // the last check before the job is completed + if (checkFatalErrors(th, errMsg)) { + console.printError("[Fatal Error] " + errMsg.toString()); + success = false; + } else { + success = rj.isSuccessful(); + } } setDone(); - th.setRunningJob(jc.getJob(rj.getJobID())); + // update based on the final value of the counters updateCounters(th); + SessionState ss = SessionState.get(); if (ss != null) { ss.getHiveHistory().logPlanProgress(queryPlan); } // LOG.info(queryPlan); + return (success); } /** @@ -413,8 +431,9 @@ taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long .valueOf(reduceProgress)); Counters ctrs = th.getCounters(); - // HIVE-1422 if (ctrs == null) { + // hadoop might return null if it cannot locate the job. + // we may still be able to retrieve the job status - so ignore return; } for (Operator op : work.getAliasToWork().values()) { @@ -446,9 +465,8 @@ */ @Override public int execute(DriverContext driverContext) { + boolean success = true; - success = true; - String invalidReason = work.isInvalid(); if (invalidReason != null) { throw new RuntimeException("Plan invalid, Reason: " + invalidReason); @@ -554,7 +572,7 @@ } int returnVal = 0; - RunningJob rj = null, orig_rj = null; + RunningJob rj = null; boolean noName = StringUtils.isEmpty(HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJOBNAME)); @@ -581,7 +599,7 @@ // make this client wait if job trcker is not behaving well. Throttle.checkJobTracker(job, LOG); - orig_rj = rj = jc.submitJob(job); + rj = jc.submitJob(job); // replace it back if (pwd != null) { HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd); @@ -593,15 +611,8 @@ ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); - progress(th); // success status will be setup inside progress + success = progress(th); - if (rj == null) { - // in the corner case where the running job has disappeared from JT - // memory remember that we did actually submit the job. - rj = orig_rj; - success = false; - } - String statusMesg = getJobEndMsg(rj.getJobID()); if (!success) { statusMesg += " with errors"; @@ -632,10 +643,12 @@ if(ctxCreated) ctx.clear(); - if (returnVal != 0 && rj != null) { - rj.killJob(); + if (rj != null) { + if (returnVal != 0) { + rj.killJob(); + } + runningJobKillURIs.remove(rj.getJobID()); } - runningJobKillURIs.remove(rj.getJobID()); } catch (Exception e) { } }