diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java index 21d70cde82..e75a44935f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java @@ -29,6 +29,16 @@ public int compareInternal(TaskRunnerCallable o1, TaskRunnerCallable o2) { // Check if these belong to the same task, and work with withinDagPriority if (o1.getQueryId().equals(o2.getQueryId())) { // Same Query + + if (fri1.getWithinDagPriority() == fri2.getWithinDagPriority()) { + // task_attempt within same vertex. + // Compare waitTime & choose the attempt that has higher wait time + long currentTime = System.currentTimeMillis(); + long wait1 = currentTime - fri1.getCurrentAttemptStartTime(); + long wait2 = currentTime - fri2.getCurrentAttemptStartTime(); + return Long.compare(wait2, wait1); + } + // Within dag priority - lower values indicate higher priority. return Integer.compare(fri1.getWithinDagPriority(), fri2.getWithinDagPriority()); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index 02bb361669..048e1d7510 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -200,9 +200,9 @@ public void testWaitQueueComparatorCanFinish() throws InterruptedException { @Test(timeout = 60000) public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 100, 10), true, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 100, 10), true, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 100, 10), true, 100000); + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 10, 1, 10), true, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 10, 2, 10), true, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 10, 3, 10), true, 100000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 3); @@ -214,6 +214,16 @@ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedExc // can not queue more requests as queue is full TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000); assertEquals(r4, queue.offer(r4, 0)); + + // add a task with currentAttemptStartTime lesser than existing tasks + TaskWrapper r0 = createTaskWrapper(createSubmitWorkRequestProto(0, 1, 0, 10, 0, 10), true, 100000); + assertEquals(r3, queue.offer(r0, 0)); + // ensure that R0 is picked up as it started much earlier. + assertEquals(r0, queue.take()); + + // other tasks + assertEquals(r1, queue.take()); + assertEquals(r2, queue.take()); } @Test(timeout = 60000)