diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index cdf24d4209b51f5fa901de5f3bbec6306a9c861a..d981119d3f6eb8fba66bf7c16aee838280d1c969 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -655,7 +655,6 @@ private boolean isInterrupted() { lDrvState.stateLock.lock(); try { if (lDrvState.driverState == DriverState.INTERRUPT) { - Thread.currentThread().interrupt(); return true; } else { return false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 34b683c52049add52fbc73c98ed5181c182b4441..1945163a0e2cfce53ee75c742143367ad23f97ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -225,6 +225,11 @@ public int execute(DriverContext driverContext) { Path emptyScratchDir; JobClient jc = null; + if (driverContext.isShutdown()) { + LOG.warn("Task was cancelled"); + return 5; + } + MapWork mWork = work.getMapWork(); ReduceWork rWork = work.getReduceWork(); @@ -398,7 +403,22 @@ public int execute(DriverContext driverContext) { HiveConfUtil.updateJobCredentialProviders(job); // Finally SUBMIT the JOB! + if (driverContext.isShutdown()) { + LOG.warn("Task was cancelled"); + return 5; + } + rj = jc.submitJob(job); + + if (driverContext.isShutdown()) { + LOG.warn("Task was cancelled"); + if (rj != null) { + rj.killJob(); + rj = null; + } + return 5; + } + this.jobID = rj.getJobID(); updateStatusInQueryDisplay(); returnVal = jobExecHelper.progress(rj, jc, ctx); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index f5d9c4c079d40539143e893e529d3e5435db8cf9..beeafd0672ce75796e98650a3f394492cb132efe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -135,6 +136,10 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter); SparkPlan plan = gen.generate(sparkWork); + if (driverContext.isShutdown()) { + throw new HiveException("Operation is cancelled."); + } + // Execute generated plan. JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 6caf2b75268857aee4d23204c17f7ac3d59ab265..4c698994e7e970811f68c4123a2eacd1ce158a10 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -207,6 +208,10 @@ private SparkJobRef submit(final DriverContext driverContext, final SparkWork sp byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork); JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes); + if (driverContext.isShutdown()) { + throw new HiveException("Operation is cancelled."); + } + JobHandle jobHandle = remoteClient.submit(job); RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout); return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 6ca05ede1aec0364e6c4ea5b306c14ea366581dc..c2a48061edf1762b5fcaaf5bd5aa9b351749ac68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -486,12 +486,26 @@ static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework c HiveLockObject obj = zLock.getHiveLockObject(); String name = getLastObjectName(parent, obj); try { - curatorFramework.delete().forPath(zLock.getPath()); + //catch InterruptedException to make sure locks can be released when the query is cancelled. + try { + curatorFramework.delete().forPath(zLock.getPath()); + } catch (InterruptedException ie) { + curatorFramework.delete().forPath(zLock.getPath()); + } // Delete the parent node if all the children have been deleted - List children = curatorFramework.getChildren().forPath(name); + List children = null; + try { + children = curatorFramework.getChildren().forPath(name); + } catch (InterruptedException ie) { + children = curatorFramework.getChildren().forPath(name); + } if (children == null || children.isEmpty()) { - curatorFramework.delete().forPath(name); + try { + curatorFramework.delete().forPath(name); + } catch (InterruptedException ie) { + curatorFramework.delete().forPath(name); + } } Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) {