diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 32a7730..d2313cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -85,6 +85,7 @@ private transient List stageIds; private transient SparkJobRef jobRef = null; private transient boolean isShutdown = false; + private transient boolean jobKilled = false; @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, @@ -112,6 +113,11 @@ public int execute(DriverContext driverContext) { jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + if (driverContext.isShutdown()) { + killJob(); + throw new HiveException("Operation is cancelled."); + } + addToHistory(jobRef); sparkJobID = jobRef.getJobId(); this.jobID = jobRef.getSparkJobStatus().getAppID(); @@ -129,7 +135,7 @@ public int execute(DriverContext driverContext) { // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. - jobRef.cancelJob(); + killJob(); } if (this.jobID == null) { this.jobID = sparkJobStatus.getAppID(); @@ -299,14 +305,25 @@ public boolean isTaskShutdown() { @Override public void shutdown() { super.shutdown(); - if (jobRef != null && !isShutdown) { + killJob(); + isShutdown = true; + } + + private void killJob() { + boolean needToKillJob = false; + synchronized(this) { + if (jobRef != null && !jobKilled) { + jobKilled = true; + needToKillJob = true; + } + } + if (needToKillJob) { try { jobRef.cancelJob(); } catch (Exception e) { LOG.warn("failed to kill job", e); } } - isShutdown = true; } /** @@ -387,6 +404,9 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { if (rc != 0) { Throwable error = sparkJobStatus.getError(); if (error != null) { + if (error instanceof InterruptedException) { + killJob(); + } setException(error); } }