diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index d8b517d48b..1e108c0c07 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -21,7 +21,6 @@ import java.lang.reflect.InvocationTargetException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Comparator; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashSet; @@ -47,6 +46,7 @@ import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase; +import org.apache.hadoop.hive.llap.daemon.impl.comparator.PreemptionQueueComparator; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; @@ -1116,37 +1116,6 @@ private void shutdownExecutor(ExecutorService executorService) { } - - @VisibleForTesting - public static class PreemptionQueueComparator implements Comparator { - - @Override - public int compare(TaskWrapper t1, TaskWrapper t2) { - TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); - TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); - FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); - - // Regardless of other criteria, ducks are always more important than non-ducks. - boolean v1 = o1.isGuaranteed(), v2 = o2.isGuaranteed(); - if (v1 != v2) return v1 ? 1 : -1; - - // Then, non-finishable must always precede finishable. - v1 = o1.canFinishForPriority(); - v2 = o2.canFinishForPriority(); - if (v1 != v2) return v1 ? 1 : -1; - - // Otherwise, heuristics. - if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) { - return 1; - } else if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) { - return -1; - } - return 0; - } - } - - public static class TaskWrapper implements FinishableStateUpdateHandler { private final TaskRunnerCallable taskRunnerCallable; private final AtomicBoolean inWaitQueue = new AtomicBoolean(false); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/PreemptionQueueComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/PreemptionQueueComparator.java new file mode 100644 index 0000000000..2ef865a7d4 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/PreemptionQueueComparator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl.comparator; + +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService; +import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; + +import java.util.Comparator; + +// Non-guaranteed tasks always have higher-priority for preemption and then +// tasks that can not Finish. If both tasks are non-guaranteed and canNotFinish +// we first preempt the task that would loose less work (lower completed percentage) +// and finally, if tasks have done the same ammount of progress pick the one that waited longer +public class PreemptionQueueComparator implements Comparator { + + @Override + public int compare(TaskExecutorService.TaskWrapper t1, TaskExecutorService.TaskWrapper t2) { + TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); + TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + // If one of the tasks is not guaranteed it is returned for preemption + boolean v1 = o1.isGuaranteed(), v2 = o2.isGuaranteed(); + if (v1 != v2) { + return v1 ? 1 : -1; + } + + // If one of the tasks is non-finishable it is returned for preemption + v1 = o1.canFinishForPriority(); + v2 = o2.canFinishForPriority(); + if (v1 != v2) { + return v1 ? 1 : -1; + } + + // The task that has the LEAST complete percentage will be preempted first (less lost work) + double completePercentTask1 = (double) fri1.getNumSelfAndUpstreamCompletedTasks() / + (double) fri1.getNumSelfAndUpstreamTasks(); + double completePercentTask2 = (double) fri2.getNumSelfAndUpstreamCompletedTasks() / + (double) fri2.getNumSelfAndUpstreamTasks(); + + if (completePercentTask1 > completePercentTask2) { + return 1; + } else if (completePercentTask1 < completePercentTask2) { + return -1; + } + + // When completion percentage is the same, preempt the task that waited LESS first + long waitTime1 = fri1.getCurrentAttemptStartTime() - fri1.getFirstAttemptStartTime(); + long waitTime2 = fri2.getCurrentAttemptStartTime() - fri2.getFirstAttemptStartTime(); + // TODO: Should we check equality as well? + return Long.compare(waitTime2, waitTime1); + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index ce9fce94de..41d538d881 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; +import org.apache.hadoop.hive.llap.daemon.impl.comparator.PreemptionQueueComparator; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.yarn.util.SystemClock; @@ -81,7 +82,7 @@ public void testPreemptionQueueComparator() throws InterruptedException { TaskWrapper r6 = createTaskWrapper( createSubmitWorkRequestProto(6, 8, 400, 500, true), false, 1000000); BlockingQueue queue = new PriorityBlockingQueue<>(6, - new TaskExecutorService.PreemptionQueueComparator()); + new PreemptionQueueComparator()); queue.offer(r6); queue.offer(r5); @@ -100,6 +101,53 @@ public void testPreemptionQueueComparator() throws InterruptedException { assertEquals(r6, queue.take()); } + @Test(timeout = 5000) + public void testPreemptionQueueEdgeCases() throws InterruptedException { + BlockingQueue queue = new PriorityBlockingQueue<>(5, + new PreemptionQueueComparator()); + + TaskWrapper r1 = createTaskWrapper( + createSubmitWorkRequestProto(1, 4, 100, 200, false), false, 100000); + TaskWrapper r2 = createTaskWrapper( + createSubmitWorkRequestProto(2, 2, 100, 300, false), false, 100000); + + // when both non-guaranteed and finishable, + // pick the one with the LEAST upstream tasks + queue.offer(r1); + queue.offer(r2); + assertEquals(r2, queue.peek()); + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + r1 = createTaskWrapper( + createSubmitWorkRequestProto(1, 10, 2, 100, 200, 1, "q1", false), false, 100000); + r2 = createTaskWrapper( + createSubmitWorkRequestProto(2, 10, 4, 100, 300, 1, "q2", false), false, 100000); + + // when both non-guaranteed and finishable, preempt the one the LEAST completed work + // E.g., Q1 is 20% complete while Q2 is 40% complete (Q2 would loose more work) + queue.offer(r1); + queue.offer(r2); + assertEquals(r1, queue.peek()); + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + // when both non-guaranteed and finishable, with the same work completed + // pick the one with the LEAST wait-time + r1 = createTaskWrapper( + createSubmitWorkRequestProto(1, 4, 100, 200, false), false, 100000); + r2 = createTaskWrapper( + createSubmitWorkRequestProto(2, 4, 100, 300, false), false, 100000); + + queue.offer(r1); + queue.offer(r2); + assertEquals(r2, queue.peek()); + } + org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(TestTaskExecutorService.class); @Test(timeout = 20000) public void testFinishablePreemptsNonFinishable() throws InterruptedException {