diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java index 238ae9e..9b6c894 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java @@ -54,17 +54,22 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); - if (knownPending1 < knownPending2) { - return -1; - } else if (knownPending1 > knownPending2) { - return 1; + // longer the wait time for an attempt wrt to its start time, higher the priority it gets + long waitTime1 = fri1.getCurrentAttemptStartTime() - fri1.getFirstAttemptStartTime(); + long waitTime2 = fri2.getCurrentAttemptStartTime() - fri2.getFirstAttemptStartTime(); + + if (waitTime1 == 0 || waitTime2 == 0) { + return knownPending1 - knownPending2; } - if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { + double ratio1 = (double) knownPending1 / (double) waitTime1; + double ratio2 = (double) knownPending2 / (double) waitTime2; + if (ratio1 < ratio2) { return -1; - } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { + } else if (ratio1 > ratio2) { return 1; } + return 0; } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 73bb68e..2cd6542 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -47,10 +47,10 @@ private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class); - public static MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime, - boolean canFinish, long workTime) { + public static MockRequest createMockRequest(int fragmentNum, int parallelism, long firstAttemptStartTime, + long currentAttemptStartTime, boolean canFinish, long workTime) { SubmitWorkRequestProto - request = createSubmitWorkRequestProto(fragmentNum, parallelism, startTime); + request = createSubmitWorkRequestProto(fragmentNum, parallelism, firstAttemptStartTime, currentAttemptStartTime); return createMockRequest(canFinish, workTime, request); } @@ -83,16 +83,16 @@ public static QueryInfo createQueryInfo() { } public static SubmitWorkRequestProto createSubmitWorkRequestProto( - int fragmentNumber, int selfAndUpstreamParallelism, - long attemptStartTime) { - return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, - attemptStartTime, 1); + int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime, + long currentAttemptStartTime) { + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, + currentAttemptStartTime, 1); } public static SubmitWorkRequestProto createSubmitWorkRequestProto( int fragmentNumber, int selfAndUpstreamParallelism, - int selfAndUpstreamComplete, - long attemptStartTime, int withinDagPriority) { + int selfAndUpstreamComplete, long firstAttemptStartTime, + long currentAttemptStartTime, int withinDagPriority) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -124,7 +124,8 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( .setFragmentRuntimeInfo(LlapDaemonProtocolProtos .FragmentRuntimeInfo .newBuilder() - .setFirstAttemptStartTime(attemptStartTime) + .setFirstAttemptStartTime(firstAttemptStartTime) + .setCurrentAttemptStartTime(currentAttemptStartTime) .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism) .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete) .setWithinDagPriority(withinDagPriority) diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index ac4e5f1..de7f2fc 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -49,10 +49,10 @@ @Test(timeout = 5000) public void testPreemptionQueueComparator() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000); - TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000); BlockingQueue queue = new PriorityBlockingQueue<>(4, new TaskExecutorService.PreemptionQueueComparator()); @@ -71,8 +71,8 @@ public void testPreemptionQueueComparator() throws InterruptedException { @Test(timeout = 10000) public void testFinishablePreeptsNonFinishable() throws InterruptedException { - MockRequest r1 = createMockRequest(1, 1, 100, false, 5000l); - MockRequest r2 = createMockRequest(2, 1, 100, true, 1000l); + MockRequest r1 = createMockRequest(1, 1, 100, 200, false, 5000l); + MockRequest r2 = createMockRequest(2, 1, 100, 200, true, 1000l); TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); taskExecutorService.init(new Configuration()); @@ -110,7 +110,7 @@ public void testFinishablePreeptsNonFinishable() throws InterruptedException { @Test(timeout = 10000) public void testPreemptionStateOnTaskMoveToFinishableState() throws InterruptedException { - MockRequest r1 = createMockRequest(1, 1, 100, false, 20000l); + MockRequest r1 = createMockRequest(1, 1, 100, 200, false, 20000l); TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); @@ -142,7 +142,7 @@ public void testPreemptionStateOnTaskMoveToFinishableState() throws InterruptedE @Test(timeout = 10000) public void testPreemptionStateOnTaskMoveToNonFinishableState() throws InterruptedException { - MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l); + MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l); TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); @@ -176,11 +176,11 @@ public void testPreemptionStateOnTaskMoveToNonFinishableState() throws Interrupt @Test(timeout = 10000) public void testWaitQueuePreemption() throws InterruptedException { - MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l); - MockRequest r2 = createMockRequest(2, 1, 200, false, 20000l); - MockRequest r3 = createMockRequest(3, 1, 300, false, 20000l); - MockRequest r4 = createMockRequest(4, 1, 400, false, 20000l); - MockRequest r5 = createMockRequest(5, 1, 500, true, 20000l); + MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l); + MockRequest r2 = createMockRequest(2, 1, 200, 330, false, 20000l); + MockRequest r3 = createMockRequest(3, 1, 300, 420, false, 20000l); + MockRequest r4 = createMockRequest(4, 1, 400, 510, false, 20000l); + MockRequest r5 = createMockRequest(5, 1, 500, 610, true, 20000l); TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index f50c657..e82f756 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -28,11 +28,11 @@ @Test(timeout = 60000) public void testWaitQueueComparator() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000); - TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); - TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), false, 1000000); + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000); + TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -50,11 +50,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -72,11 +72,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -94,11 +94,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -116,11 +116,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -138,11 +138,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -163,9 +163,9 @@ public void testWaitQueueComparator() throws InterruptedException { @Test(timeout = 60000) public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000); + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 5), false, 100000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); @@ -181,9 +181,9 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti @Test(timeout = 60000) public void testWaitQueueComparatorParallelism() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1), false, 100000); // 7 pending + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1), false, 100000); // 3 pending + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1), false, 100000); // 5 pending EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); @@ -196,4 +196,39 @@ public void testWaitQueueComparatorParallelism() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r1, queue.take()); } + + @Test(timeout = 60000) + public void testWaitQueueComparatorAging() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200), true, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200), true, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r1, queue.take()); + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + + // priority = 10 / (200 - 100) = 0.01 + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000); + // priority = 20 / (3000 - 100) = 0.0069 + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000), true, 100000); + // priority = 30 / (4000 - 100) = 0.0076 + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000), true, 100000); + + queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } }