diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java index 447fc7b..ae1ca5d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java @@ -53,11 +53,7 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { if (o1.getQueryId().equals(o2.getQueryId())) { // Same Query // Within dag priority - lower values indicate higher priority. - if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { - return -1; - } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ - return 1; - } + return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority()); } if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java index 9b6c894..b54f740 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java @@ -43,11 +43,7 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { if (o1.getQueryId().equals(o2.getQueryId())) { // Same Query // Within dag priority - lower values indicate higher priority. - if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { - return -1; - } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ - return 1; - } + return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority()); } // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 2cd6542..6506d07 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -90,9 +90,24 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( } public static SubmitWorkRequestProto createSubmitWorkRequestProto( + int fragmentNumber, int selfAndUpstreamParallelism, long firstAttemptStartTime, + long currentAttemptStartTime, String dagName) { + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, + currentAttemptStartTime, 1, dagName); + } + + public static SubmitWorkRequestProto createSubmitWorkRequestProto( int fragmentNumber, int selfAndUpstreamParallelism, int selfAndUpstreamComplete, long firstAttemptStartTime, long currentAttemptStartTime, int withinDagPriority) { + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, + currentAttemptStartTime, withinDagPriority, "MockDag"); + } + + public static SubmitWorkRequestProto createSubmitWorkRequestProto( + int fragmentNumber, int selfAndUpstreamParallelism, + int selfAndUpstreamComplete, long firstAttemptStartTime, + long currentAttemptStartTime, int withinDagPriority, String dagName) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -103,7 +118,7 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( .setWorkSpec( VertexOrBinary.newBuilder().setVertex( SignableVertexSpec.newBuilder() - .setDagName("MockDag") + .setDagName(dagName) .setUser("MockUser") .setTokenIdentifier("MockToken_1") .setQueryIdentifier( diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index 5ea62aa..8cce0cb 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -39,14 +39,23 @@ public class TestFirstInFirstOutComparator { private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, - int attemptStartTime) { + int attemptStartTime) { // Same priority for all tasks. return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1); } private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, - int numSelfAndUpstreamComplete, int dagStartTime, - int attemptStartTime, int withinDagPriority) { + int numSelfAndUpstreamComplete, int dagStartTime, + int attemptStartTime, int withinDagPriority) { + return createRequest(fragmentNumber, numSelfAndUpstreamTasks, numSelfAndUpstreamComplete, + dagStartTime, attemptStartTime, withinDagPriority, "MockDag"); + } + + + private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, + int numSelfAndUpstreamComplete, int dagStartTime, + int attemptStartTime, int withinDagPriority, + String dagName) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -58,22 +67,22 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndU .setFragmentNumber(fragmentNumber) .setWorkSpec( VertexOrBinary.newBuilder().setVertex( - SignableVertexSpec - .newBuilder() - .setQueryIdentifier( - QueryIdentifierProto.newBuilder() - .setApplicationIdString(appId.toString()) - .setAppAttemptNumber(0) - .setDagIndex(dagId.getId()) - .build()) - .setVertexIndex(vId.getId()) - .setDagName("MockDag") - .setVertexName("MockVertex") - .setUser("MockUser") - .setTokenIdentifier("MockToken_1") - .setProcessorDescriptor( - EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) - .build()).build()) + SignableVertexSpec + .newBuilder() + .setQueryIdentifier( + QueryIdentifierProto.newBuilder() + .setApplicationIdString(appId.toString()) + .setAppAttemptNumber(0) + .setDagIndex(dagId.getId()) + .build()) + .setVertexIndex(vId.getId()) + .setDagName(dagName) + .setVertexName("MockVertex") + .setUser("MockUser") + .setTokenIdentifier("MockToken_1") + .setProcessorDescriptor( + EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) + .build()).build()) .setAmHost("localhost") .setAmPort(12345) .setContainerIdString("MockContainer_1") @@ -240,8 +249,8 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r4, queue.peek()); // offer accepted, r1 evicted assertEquals(r1, queue.offer(r5)); - assertEquals(r4, queue.take()); assertEquals(r5, queue.take()); + assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); assertEquals(r2, queue.take()); } @@ -265,10 +274,28 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti } @Test(timeout = 60000) + public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 10, 100, 10), true, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 3); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + // can not queue more requests as queue is full + TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000); + assertEquals(r4, queue.offer(r4)); + } + + @Test(timeout = 60000) public void testWaitQueueComparatorParallelism() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000); - TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000); - TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000); + TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1, "q1"), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1, "q2"), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1, "q3"), false, 100000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new FirstInFirstOutComparator(), 4); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index e82f756..0059d0c 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -28,11 +28,11 @@ @Test(timeout = 60000) public void testWaitQueueComparator() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000); - TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000); - TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), false, 1000000); + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000); + TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -50,11 +50,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), true, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), true, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -72,11 +72,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100, 1000, "q1"), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200, 900, "q2"), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 800, "q3"), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 700, "q4"), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -94,11 +94,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -116,11 +116,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), false, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), false, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -138,11 +138,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200), false, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300), true, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500), true, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600), true, 1000000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100, 200, "q1"), false, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200, 300, "q2"), true, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300, 400, "q3"), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400, 500, "q4"), true, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); @@ -180,10 +180,28 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti } @Test(timeout = 60000) + public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 10), true, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 3); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + // can not queue more requests as queue is full + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000); + assertEquals(r4, queue.offer(r4)); + } + + @Test(timeout = 60000) public void testWaitQueueComparatorParallelism() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1), false, 100000); // 7 pending - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1), false, 100000); // 3 pending - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1), false, 100000); // 5 pending + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1, "q1"), false, 100000); // 7 pending + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1, "q2"), false, 100000); // 3 pending + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 10, 100, 1, "q3"), false, 100000); // 5 pending EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); @@ -199,12 +217,12 @@ public void testWaitQueueComparatorParallelism() throws InterruptedException { @Test(timeout = 60000) public void testWaitQueueComparatorAging() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200), true, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200), true, 100000); + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200, "q1"), true, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 200, "q2"), true, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 200, "q3"), true, 100000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new ShortestJobFirstComparator(), 4); + new ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); assertNull(queue.offer(r2)); @@ -215,11 +233,11 @@ public void testWaitQueueComparatorAging() throws InterruptedException { assertEquals(r3, queue.take()); // priority = 10 / (200 - 100) = 0.01 - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200), true, 100000); + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 100, 200, "q1"), true, 100000); // priority = 20 / (3000 - 100) = 0.0069 - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 20, 100, 3000, "q2"), true, 100000); // priority = 30 / (4000 - 100) = 0.0076 - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000), true, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 30, 100, 4000, "q3"), true, 100000); queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4);