diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 246989467c..93b59dcb4e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -222,6 +222,20 @@ public synchronized void setCapacity(int newNumExecutors, int newWaitQueueSize) waitQueue.setWaitQueueSize(newWaitQueueSize); metrics.setNumExecutors(newNumExecutors); metrics.setWaitQueueSize(newWaitQueueSize); + // If there is no executor left so the queued tasks can not be finished anyway, kill them all. + if (newNumExecutors == 0) { + synchronized (lock) { + TaskWrapper task = waitQueue.peek(); + while (task != null) { + LOG.info("Killing task [" + task + "], since no executor left."); + task.getTaskRunnerCallable().killTask(); + if (waitQueue.remove(task)) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } + task = waitQueue.peek(); + } + } + } LOG.info("TaskExecutorService is setting capacity to: numExecutors=" + newNumExecutors + ", waitQueueSize=" + newWaitQueueSize); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 948a678f83..ce9fce94de 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -531,6 +531,46 @@ public void testSetCapacity() throws InterruptedException { } } + @Test(timeout = 1000) + public void testZeroCapacity() throws InterruptedException { + TaskExecutorServiceForTest taskExecutorService = + new TaskExecutorServiceForTest(1, 1, ShortestJobFirstComparator.class.getName(), true, mockMetrics); + + // Fourth is lower priority as a result of canFinish being set to false. + MockRequest r1 = createMockRequest(1, 1, 1, 100, 200, true, 20000L, true); + MockRequest r2 = createMockRequest(2, 1, 2, 100, 200, true, 20000L, true); + + taskExecutorService.init(new Configuration()); + taskExecutorService.start(); + + try { + Scheduler.SubmissionState submissionState; + // Schedule the first 2 tasks (1 to execute, 1 to the queue) + submissionState = taskExecutorService.schedule(r1); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + + submissionState = taskExecutorService.schedule(r2); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + + awaitStartAndSchedulerRun(r1, taskExecutorService); + + taskExecutorService.setCapacity(0, 0); + + // The queued task should be killed + assertTrue(r2.wasPreempted()); + + // The already running should be able to finish + assertFalse(r1.wasPreempted()); + r1.complete(); + r1.awaitEnd(); + TaskExecutorServiceForTest.InternalCompletionListenerForTest icl = + taskExecutorService.getInternalCompletionListenerForTest(r1.getRequestId()); + icl.awaitCompletion(); + } finally { + taskExecutorService.shutDown(false); + } + } + @Test(timeout = 1000, expected = IllegalArgumentException.class) public void testSetCapacityHighExecutors() { TaskExecutorServiceForTest taskExecutorService =