diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index a5c0fcd..20ecbcd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -117,6 +117,8 @@ protected transient JobConf job; public static MemoryMXBean memoryMXBean; protected HadoopJobExecHelper jobExecHelper; + private transient boolean isShutdown = false; + private transient boolean jobKilled = false; protected static transient final Logger LOG = LoggerFactory.getLogger(ExecDriver.class); @@ -413,10 +415,7 @@ public int execute(DriverContext driverContext) { if (driverContext.isShutdown()) { LOG.warn("Task was cancelled"); - if (rj != null) { - rj.killJob(); - rj = null; - } + killJob(); return 5; } @@ -449,7 +448,7 @@ public int execute(DriverContext driverContext) { if (rj != null) { if (returnVal != 0) { - rj.killJob(); + killJob(); } jobID = rj.getID().toString(); } @@ -857,22 +856,37 @@ public void logPlanProgress(SessionState ss) throws IOException { ss.getHiveHistory().logPlanProgress(queryPlan); } + public boolean isTaskShutdown() { + return isShutdown; + } + @Override public void shutdown() { super.shutdown(); - if (rj != null) { + killJob(); + isShutdown = true; + } + + @Override + public String getExternalHandle() { + return this.jobID; + } + + private void killJob() { + boolean needToKillJob = false; + synchronized(this) { + if (rj != null && !jobKilled) { + jobKilled = true; + needToKillJob = true; + } + } + if (needToKillJob) { try { rj.killJob(); } catch (Exception e) { LOG.warn("failed to kill job " + rj.getID(), e); } - rj = null; } } - - @Override - public String getExternalHandle() { - return this.jobID; - } }