diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 4c01329..32a7730 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -83,6 +83,8 @@ private transient int totalTaskCount; private transient int failedTaskCount; private transient List stageIds; + private transient SparkJobRef jobRef = null; + private transient boolean isShutdown = false; @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, @@ -107,7 +109,7 @@ public int execute(DriverContext driverContext) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); - SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); + jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); addToHistory(jobRef); @@ -290,6 +292,23 @@ public long getFinishTime() { return finishTime; } + public boolean isTaskShutdown() { + return isShutdown; + } + + @Override + public void shutdown() { + super.shutdown(); + if (jobRef != null && !isShutdown) { + try { + jobRef.cancelJob(); + } catch (Exception e) { + LOG.warn("failed to kill job", e); + } + } + isShutdown = true; + } + /** * Set the number of reducers for the spark work. */