diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 6c7aca7be84555930f33dfa6490455ad12753f0e..1dc5aa2368f35901cebfd3b694f70cafb4efe7f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -177,6 +177,12 @@ public int startMonitor() { done = true; rc = 3; break; + case CANCELLED: + console.printInfo("Status: Cancelled"); + running = false; + done = true; + rc = 3; + break; } if (!done) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 928ecc00be06b766ae3548315661ecda5110f7d8..2e618181824926c9ab6747603793ff2c289673b6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -27,10 +28,14 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hive.spark.client.JobHandle.State; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -81,6 +86,16 @@ public void removeEmptySparkTask() { Assert.assertEquals(child1.getParentTasks().size(), 0); } + @Test + public void testRemoteSparkCancel() { + RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class); + when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED); + HiveConf hiveConf = new HiveConf(); + RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, jobSts); + Assert.assertEquals(remoteSparkJobMonitor.startMonitor(),3); + } + + private boolean isEmptySparkWork(SparkWork sparkWork) { List allWorks = sparkWork.getAllWork(); boolean allWorksIsEmtpy = true;