diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 98b160530a..b4fb49fdc3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -85,6 +85,7 @@ private transient List stageIds; private transient SparkJobRef jobRef = null; private transient boolean isShutdown = false; + private transient boolean jobKilled = false; @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, @@ -112,6 +113,11 @@ public int execute(DriverContext driverContext) { jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + if (driverContext.isShutdown()) { + killJob(); + throw new HiveException("Operation is cancelled."); + } + addToHistory(jobRef); sparkJobID = jobRef.getJobId(); this.jobID = jobRef.getSparkJobStatus().getAppID(); @@ -130,11 +136,11 @@ public int execute(DriverContext driverContext) { // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. LOG.info("Failed to submit Spark job " + sparkJobID); - jobRef.cancelJob(); + killJob(); } else if (rc == 4) { LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); - jobRef.cancelJob(); + killJob(); } if (this.jobID == null) { @@ -305,14 +311,27 @@ public boolean isTaskShutdown() { @Override public void shutdown() { super.shutdown(); - if (jobRef != null && !isShutdown) { + killJob(); + isShutdown = true; + } + + private void killJob() { + boolean needToKillJob = false; + if (jobRef != null && !jobKilled) { + synchronized (this) { + if (!jobKilled) { + jobKilled = true; + needToKillJob = true; + } + } + } + if (needToKillJob) { try { jobRef.cancelJob(); } catch (Exception e) { LOG.warn("failed to kill job", e); } } - isShutdown = true; } /** @@ -393,6 +412,11 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) { if (rc != 0) { Throwable error = sparkJobStatus.getError(); if (error != null) { + if ((error instanceof InterruptedException) || + (error instanceof HiveException && + error.getCause() instanceof InterruptedException)) { + killJob(); + } setException(error); } }