diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java index 447fc7b..5e40e17 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java @@ -57,6 +57,8 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { return -1; } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ return 1; + } else { + return 0; } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java index 9b6c894..c4fcc20 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java @@ -47,6 +47,8 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { return -1; } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ return 1; + } else { + return 0; } } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index 5ea62aa..e47dba6 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -265,6 +265,24 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti } @Test(timeout = 60000) + public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 10, 100, 10), true, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 3); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + // can not queue more requests as queue is full + TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000); + assertEquals(r4, queue.offer(r4)); + } + + @Test(timeout = 60000) public void testWaitQueueComparatorParallelism() throws InterruptedException { TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000); TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index e82f756..3877978 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -180,6 +180,24 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti } @Test(timeout = 60000) + public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 10), true, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 3); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + // can not queue more requests as queue is full + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000); + assertEquals(r4, queue.offer(r4)); + } + + @Test(timeout = 60000) public void testWaitQueueComparatorParallelism() throws InterruptedException { TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 10, 100, 1), false, 100000); // 7 pending TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 10, 100, 1), false, 100000); // 3 pending