diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java index 9d7af7ebfb..9f4ea93fdd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java @@ -26,7 +26,7 @@ public int compareInternal(TaskRunnerCallable o1, TaskRunnerCallable o2) { LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); - // Check if these belong to the same task, and work with withinDagPriority + // Check if these belong to the same DAG, and work with withinDagPriority if (o1.getQueryId().equals(o2.getQueryId())) { // Same Query @@ -44,10 +44,16 @@ public int compareInternal(TaskRunnerCallable o1, TaskRunnerCallable o2) { // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); - // longer the wait time for an attempt wrt to its start time, higher the priority it gets + // Multi-task vertex attempt: longer wait-time wrt to its start time -> higher priority long waitTime1 = fri1.getCurrentAttemptStartTime() - fri1.getFirstAttemptStartTime(); long waitTime2 = fri2.getCurrentAttemptStartTime() - fri2.getFirstAttemptStartTime(); + // Single-task vertex attempt: earlier start-time -> higher priority + if (fri1.getNumSelfAndUpstreamTasks() == fri2.getNumSelfAndUpstreamTasks() && + fri1.getNumSelfAndUpstreamTasks() == 1) { + return (int) (fri1.getCurrentAttemptStartTime() - fri2.getCurrentAttemptStartTime()); + } + if (waitTime1 == 0 || waitTime2 == 0) { return knownPending1 - knownPending2; } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 69e1d871fa..5a1f3f45ac 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -113,18 +113,25 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( currentAttemptStartTime, 1, isGuaranteed); } + public static SubmitWorkRequestProto createSubmitWorkRequestProto( + int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime, + long currentAttemptStartTime, String dagName, int numCompletedTasks) { + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, + currentAttemptStartTime, 1, dagName, false, numCompletedTasks); + } + public static SubmitWorkRequestProto createSubmitWorkRequestProto( int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime, long currentAttemptStartTime, String dagName) { return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, - currentAttemptStartTime, 1, dagName, false); + currentAttemptStartTime, 1, dagName, false, 0); } public static SubmitWorkRequestProto createSubmitWorkRequestProto( int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime, long currentAttemptStartTime, String dagName, boolean isGuaranteed) { return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, - currentAttemptStartTime, 1, dagName, isGuaranteed); + currentAttemptStartTime, 1, dagName, isGuaranteed, 0); } public static SubmitWorkRequestProto createSubmitWorkRequestProto( @@ -132,7 +139,7 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( int selfAndUpstreamComplete, long firstAttemptStartTime, long currentAttemptStartTime, int withinDagPriority) { return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, - currentAttemptStartTime, withinDagPriority, "MockDag", false); + currentAttemptStartTime, withinDagPriority, "MockDag", false, 0); } public static SubmitWorkRequestProto createSubmitWorkRequestProto( @@ -140,14 +147,14 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( int selfAndUpstreamComplete, long firstAttemptStartTime, long currentAttemptStartTime, int withinDagPriority, boolean isGuaranteed) { return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, - currentAttemptStartTime, withinDagPriority, "MockDag", isGuaranteed); + currentAttemptStartTime, withinDagPriority, "MockDag", isGuaranteed, 0); } public static SubmitWorkRequestProto createSubmitWorkRequestProto( int fragmentNumber, int selfAndUpstreamParallelism, int selfAndUpstreamComplete, long firstAttemptStartTime, long currentAttemptStartTime, int withinDagPriority, String dagName, - boolean isGuaranteed) { + boolean isGuaranteed, int numCompletedTasks) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -186,6 +193,7 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism) .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete) .setWithinDagPriority(withinDagPriority) + .setNumSelfAndUpstreamCompletedTasks(numCompletedTasks) .build()) .build(); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index 048e1d7510..c07765cda3 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -18,9 +18,11 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.junit.Test; public class TestShortestJobFirstComparator { @@ -82,17 +84,20 @@ public void testWaitQueueComparator() throws InterruptedException { assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2, 0)); + // q2 can not finish thus q1 remains in top assertEquals(r1, queue.peek()); assertNull(queue.offer(r3, 0)); - assertEquals(r1, queue.peek()); + // q3 is a single-task vertex that started before q1 so it will take its place + assertEquals(r3, queue.peek()); assertNull(queue.offer(r4, 0)); - assertEquals(r1, queue.peek()); - // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5, 0)); - assertEquals(r1, queue.take()); + // q4 can not finish thus q3 remains in top + assertEquals(r3, queue.peek()); + // offer accepted and r2 gets evicted (later start-time than q4) + assertEquals(r2, queue.offer(r5, 0)); assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); assertEquals(r5, queue.take()); - assertEquals(r2, queue.take()); + assertEquals(r4, queue.take()); r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000); r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000); @@ -228,9 +233,9 @@ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedExc @Test(timeout = 60000) public void testWaitQueueComparatorParallelism() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1, "q1", false), false, 100000); // 7 pending - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1, "q2", false), false, 100000); // 3 pending - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1, "q3", false), false, 100000); // 5 pending + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1, "q1", false, 0), false, 100000); // 7 pending + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1, "q2", false, 0), false, 100000); // 3 pending + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1, "q3", false, 0), false, 100000); // 5 pending EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); @@ -278,4 +283,134 @@ public void testWaitQueueComparatorAging() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r1, queue.take()); } + + @Test(timeout = 60000) + public void testWaitQueueAging() throws InterruptedException { + // Different Queries (DAGs) where all (different) fragments have + // upstream parallelism of 1. They also have 1 task, which means first + // & current attempt time would be the same. + TaskWrapper[] r = new TaskWrapper[50]; + + for (int i = 0; i < 50; i++) { + LlapDaemonProtocolProtos.SubmitWorkRequestProto proto = + createSubmitWorkRequestProto(i, 1, 100, 100 + i, "q" + i); + r[i] = createTaskWrapper(proto, true, 100000); + } + + // Make sure we dont have evictions triggered (maxSize = taskSize) + EvictingPriorityBlockingQueue queue = + new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 50); + + for (int i = 0; i < 50; i++) { + assertNull(queue.offer(r[i], 0)); + } + + TaskWrapper prev = queue.take(); + for (int i = 1; i < 50; i++) { + TaskWrapper curr = queue.take(); + // Make sure that earlier requests were scheduled first + assertTrue(curr.getRequestId().compareTo(prev.getRequestId()) > 0); + prev = curr; + } + } + + @Test(timeout = 60000) + public void testWaitQueueAgingMulti() throws InterruptedException { + // Make sure we dont have evictions triggered (maxSize = taskSize) + EvictingPriorityBlockingQueue queue = + new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 10); + + // Single task DAG with same start and attempt time (wait-time zero) + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 1000, 1000, "q11"), true, 1000); + // Multi task DAG with same start-time as above but 11 out of 12 task completed! + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 1000, 1500, "q12", 11), true, 1000); + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2, 0)); + assertEquals(r2, queue.peek()); // same number of pending tasks + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + // Single task DAG with different start and attempt time + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 800, 1000, "q11"), true, 1000); + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); // ratio = 1/200 = 0.005 + assertNull(queue.offer(r2, 0)); + assertEquals(r2, queue.peek()); // ratio = 1/500 = 0.002 + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + // Single task DAG with different start and attempt time + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 800, 1000, "q11"), true, 1000); + // Multi-task DAG with 5 out of 12 + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 1000, 1500, "q12", 5), true, 1000); + + // pending/wait-time -> r2 has lower priority because it has more pending tasks + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); // ratio = 1/200 = 0.005 + assertNull(queue.offer(r2, 0)); + assertEquals(r1, queue.peek()); // ratio = 7/500 = 0.014 + } + + @Test(timeout = 60000) + public void testWaitQueueAgingComplex() throws InterruptedException { + // Make sure we dont have evictions triggered (maxSize = taskSize) + EvictingPriorityBlockingQueue queue = + new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 10); + + // Single-Task DAGs + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(5, 1, 100, 1000, "q5"), true, 1000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 200, 900, "q4"), true, 1000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800, "q3"), true, 1000); + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 400, 700, "q2"), true, 1000); + TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 500, 600, "q1"), true, 1000); + + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2, 0)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3, 0)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4, 0)); + assertEquals(r4, queue.peek()); + assertNull(queue.offer(r5, 0)); + assertEquals(r5, queue.peek()); + + // Multi-Task DAGs + TaskWrapper r6 = createTaskWrapper(createSubmitWorkRequestProto(10, 10, 100, 1000, "q10"), true, 1000); + TaskWrapper r7 = createTaskWrapper(createSubmitWorkRequestProto(9, 10, 200, 900, "q9"), true, 1000); + TaskWrapper r8 = createTaskWrapper(createSubmitWorkRequestProto(8, 10, 300, 800, "q8"), true, 1000); + TaskWrapper r9 = createTaskWrapper(createSubmitWorkRequestProto(7, 10, 400, 700, "q7"), true, 1000); + TaskWrapper r10 = createTaskWrapper(createSubmitWorkRequestProto(6, 10, 500, 600, "q6"), true, 1000); + + assertNull(queue.offer(r6, 0)); + assertEquals(r5, queue.peek()); + assertNull(queue.offer(r7, 0)); + assertEquals(r5, queue.peek()); + assertNull(queue.offer(r8, 0)); + assertEquals(r5, queue.peek()); + assertNull(queue.offer(r9, 0)); + assertEquals(r5, queue.peek()); + assertNull(queue.offer(r10, 0)); + assertEquals(r5, queue.peek()); + + + TaskWrapper prev = queue.take(); + for (int i = 1; i < 10; i++) { + TaskWrapper curr = queue.take(); + // Single Task vertices have lower ratio so they are always scheduled first (1/wait-time instead of 10/wait-time) + if (i <= 5) { + assertTrue(curr.getRequestId().compareTo(prev.getRequestId()) > 0); + } + // Multi-task vertices are scheduled based on wait time (so 10->5 in descending order) + else { + assertTrue(curr.getRequestId().compareTo(prev.getRequestId()) < 0); + } + prev = curr; + } + } }