diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 22f70243e6c7231abd22a358ac24cc417d39962e..fc4e4de2f9a5ad8d178f36d8e1e43f810648be19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -174,6 +174,12 @@ public int startMonitor() { done = true; rc = 3; break; + case CANCELLED: + console.printInfo("Status: Cancelled"); + running = false; + done = true; + rc = 3; + break; } if (!done) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 928ecc00be06b766ae3548315661ecda5110f7d8..435c6b606b8b173b1d4d7053d2770bb6f983319d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -27,10 +28,14 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hive.spark.client.JobHandle.State; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -81,6 +86,17 @@ public void removeEmptySparkTask() { Assert.assertEquals(child1.getParentTasks().size(), 0); } + @Test + public void testRemoteSparkCancel() { + RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class); + when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED); + when(jobSts.isRemoteActive()).thenReturn(true); + HiveConf hiveConf = new HiveConf(); + RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, jobSts); + Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3); + } + + private boolean isEmptySparkWork(SparkWork sparkWork) { List allWorks = sparkWork.getAllWork(); boolean allWorksIsEmtpy = true;