diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index d8b517d48b..1e108c0c07 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -21,7 +21,6 @@ import java.lang.reflect.InvocationTargetException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Comparator; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashSet; @@ -47,6 +46,7 @@ import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase; +import org.apache.hadoop.hive.llap.daemon.impl.comparator.PreemptionQueueComparator; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; @@ -1116,37 +1116,6 @@ private void shutdownExecutor(ExecutorService executorService) { } - - @VisibleForTesting - public static class PreemptionQueueComparator implements Comparator { - - @Override - public int compare(TaskWrapper t1, TaskWrapper t2) { - TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); - TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); - FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); - - // Regardless of other criteria, ducks are always more important than non-ducks. - boolean v1 = o1.isGuaranteed(), v2 = o2.isGuaranteed(); - if (v1 != v2) return v1 ? 1 : -1; - - // Then, non-finishable must always precede finishable. - v1 = o1.canFinishForPriority(); - v2 = o2.canFinishForPriority(); - if (v1 != v2) return v1 ? 1 : -1; - - // Otherwise, heuristics. - if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) { - return 1; - } else if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) { - return -1; - } - return 0; - } - } - - public static class TaskWrapper implements FinishableStateUpdateHandler { private final TaskRunnerCallable taskRunnerCallable; private final AtomicBoolean inWaitQueue = new AtomicBoolean(false); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/PreemptionQueueComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/PreemptionQueueComparator.java new file mode 100644 index 0000000000..bbe48caeb2 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/PreemptionQueueComparator.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.hive.llap.daemon.impl.comparator; + +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService; +import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; + +import java.util.Comparator; + +// Non-guaranteed tasks always have higher-priority for preemption and then +// tasks that can not Finish. If they are both non-guaranteed and canNotFinish +// we first preempt tasks that have more pending tasks and then the ones that waited longer +public class PreemptionQueueComparator implements Comparator { + + @Override + public int compare(TaskExecutorService.TaskWrapper t1, TaskExecutorService.TaskWrapper t2) { + TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); + TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + // If one of the tasks is not guaranteed it is returned for preemption + boolean v1 = o1.isGuaranteed(), v2 = o2.isGuaranteed(); + if (v1 != v2) { + return v1 ? 1 : -1; + } + + // If one of the tasks is non-finishable it is returned for preemption + v1 = o1.canFinishForPriority(); + v2 = o2.canFinishForPriority(); + if (v1 != v2) { + return v1 ? 1 : -1; + } + + // The task that has the LEAST upstream tasks will be preempted first + if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) { + return 1; + } else if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) { + return -1; + } + + // When upstream tasks are the same, preempt the task that waited LESS first + long waitTime1 = fri1.getCurrentAttemptStartTime() - fri1.getFirstAttemptStartTime(); + long waitTime2 = fri2.getCurrentAttemptStartTime() - fri2.getFirstAttemptStartTime(); + // TODO: Should we check equality as well? + return Long.compare(waitTime2, waitTime1); + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index ce9fce94de..318151fdb3 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; +import org.apache.hadoop.hive.llap.daemon.impl.comparator.PreemptionQueueComparator; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.yarn.util.SystemClock; @@ -81,7 +82,7 @@ public void testPreemptionQueueComparator() throws InterruptedException { TaskWrapper r6 = createTaskWrapper( createSubmitWorkRequestProto(6, 8, 400, 500, true), false, 1000000); BlockingQueue queue = new PriorityBlockingQueue<>(6, - new TaskExecutorService.PreemptionQueueComparator()); + new PreemptionQueueComparator()); queue.offer(r6); queue.offer(r5); @@ -100,6 +101,38 @@ public void testPreemptionQueueComparator() throws InterruptedException { assertEquals(r6, queue.take()); } + @Test(timeout = 5000) + public void testPreemptionQueueEdgeCases() throws InterruptedException { + BlockingQueue queue = new PriorityBlockingQueue<>(5, + new PreemptionQueueComparator()); + + TaskWrapper r1 = createTaskWrapper( + createSubmitWorkRequestProto(1, 4, 100, 200, false), false, 100000); + TaskWrapper r2 = createTaskWrapper( + createSubmitWorkRequestProto(2, 2, 100, 300, false), false, 100000); + + // when both non-guaranteed and finishable, + // pick the one with the LEAST upstream tasks + queue.offer(r1); + queue.offer(r2); + assertEquals(r2, queue.peek()); + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + r1 = createTaskWrapper( + createSubmitWorkRequestProto(1, 4, 100, 200, false), false, 100000); + r2 = createTaskWrapper( + createSubmitWorkRequestProto(2, 4, 100, 300, false), false, 100000); + + // when both non-guaranteed and finishable, with the same upstream + // pick the one with the LEAST wait-time + queue.offer(r1); + queue.offer(r2); + assertEquals(r2, queue.peek()); + } + org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(TestTaskExecutorService.class); @Test(timeout = 20000) public void testFinishablePreemptsNonFinishable() throws InterruptedException {