commit a0fe46d147c3ad50e489f43bb3094b717934bd95 Author: Mithun RK Date: Tue Aug 8 14:01:01 2017 -0700 HIVE-17273: MergeFileTask needs to be interruptible. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index da99c23..92c16c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -37,10 +37,10 @@ import org.apache.hadoop.hive.ql.exec.mr.Throttle; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.FileInputFormat; @@ -62,6 +62,11 @@ private HadoopJobExecHelper jobExecHelper; private boolean success = true; + // Hooks for cancellation. + private volatile boolean isShutDown = false; + private final Object runningJobLock = new Object(); + private RunningJob runningJob = null; + @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) { @@ -152,8 +157,30 @@ public int execute(DriverContext driverContext) { // make this client wait if job trcker is not behaving well. Throttle.checkJobTracker(job, LOG); + // Before submitting, check that the task wasn't shut down. + if (this.isShutDown) { + throw new HiveException("MergeFiles operation cancelled!"); + } + // Finally SUBMIT the JOB! rj = jc.submitJob(job); + + boolean wasShutDown; + synchronized (runningJobLock) { + assert this.runningJob == null : "runningJob shouldn't've been set yet."; + wasShutDown = this.isShutDown; + + if (!wasShutDown) { + // Task was not interrupted (i.e. shutdown()) since the last check. + this.runningJob = rj; + } + } + + if (wasShutDown) { + // Cancellation of runningJob happens finally{}. + throw new HiveException("MergeFiles operation cancelled!"); + } + this.jobID = rj.getJobID(); returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); @@ -164,7 +191,7 @@ public int execute(DriverContext driverContext) { if (rj != null) { mesg = "Ended Job = " + rj.getJobID() + mesg; } else { - mesg = "Job Submission failed" + mesg; + mesg = "Job Submission failed, or was interrupted" + mesg; } // Has to use full name to make sure it does not conflict with @@ -179,11 +206,11 @@ public int execute(DriverContext driverContext) { if (ctxCreated) { ctx.clear(); } - if (rj != null) { - if (returnVal != 0) { - rj.killJob(); - } + + if (!success) { + cancelRunningJob(rj); } + // get the list of Dynamic partition paths if (rj != null) { if (work.getAliasToWork() != null) { @@ -194,8 +221,8 @@ public int execute(DriverContext driverContext) { } } } catch (Exception e) { - // jobClose needs to execute successfully otherwise fail task - LOG.warn("Job close failed ",e); + // jobClose needs to execute successfully otherwise fail task + LOG.warn("Job close failed ",e); if (success) { setException(e); success = false; @@ -206,7 +233,7 @@ public int execute(DriverContext driverContext) { org.apache.hadoop.util.StringUtils.stringifyException(e)); } } finally { - HadoopJobExecHelper.runningJobs.remove(rj); + HadoopJobExecHelper.runningJobs.remove(rj); } } @@ -220,6 +247,33 @@ private void addInputPaths(JobConf job, MergeFileWork work) { } @Override + public void shutdown() { + super.shutdown(); + RunningJob runningJob; + synchronized (runningJobLock) { + this.isShutDown = true; + runningJob = this.runningJob; + } + LOG.info("Shutting down MergeFileTask " + this + (runningJob == null? " before submit." : "")); + cancelRunningJob(runningJob); + } + + private void cancelRunningJob(RunningJob runningJob) { + try { + if (runningJob != null) { + runningJob.killJob(); + LOG.info("Killed job: " + runningJob.getJobID() + " for MergeFileTask " + this); + } + else { + LOG.info("No job to kill." + this); + } + } + catch(IOException exception) { + LOG.warn("Could not kill job: " + runningJob.getJobID() + " for MergeFileTask " + this, exception); + } + } + + @Override public String getName() { return "MergeFileTask"; }