diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index 101a69c..926835b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -64,4 +64,9 @@ public synchronized E take() throws InterruptedException { public synchronized void remove(E e) { deque.remove(e); } + + @Override + public synchronized String toString() { + return deque.toString(); + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 5323f05..599c759 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -186,6 +186,9 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException LOG.info(task.getRequestId() + " added to wait queue."); } + if (isDebugEnabled) { + LOG.debug("Wait Queue: {}", waitQueue); + } synchronized (waitLock) { waitLock.notify(); } @@ -227,7 +230,7 @@ private boolean trySchedule(final TaskRunnerCallable task) { if (enablePreemption && task.canFinish() && !preemptionQueue.isEmpty()) { if (isDebugEnabled) { - LOG.trace("preemptionQueue: " + preemptionQueue); + LOG.debug("Preemption Queue: " + preemptionQueue); } TaskRunnerCallable pRequest = preemptionQueue.remove(); @@ -334,10 +337,16 @@ public int compare(TaskRunnerCallable o1, TaskRunnerCallable o2) { return 1; } - if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + return -1; + } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) { return 1; - } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + } + + if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) { return -1; + } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) { + return 1; } return 0; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index d1b1c61..94512d6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -98,7 +98,7 @@ private volatile long startTime; private volatile String threadName; private LlapDaemonExecutorMetrics metrics; - protected String requestId; + private final String requestId; private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); @@ -335,7 +335,9 @@ public void shutdown() { @Override public String toString() { - return requestId; + return requestId + " {canFinish: " + canFinish() + + " vertexParallelism: " + getVertexParallelism() + + " firstAttemptStartTime: " + getFirstAttemptStartTime() + "}"; } @Override @@ -470,4 +472,8 @@ public static String getTaskIdentifierString( private String getTaskAttemptId(SubmitWorkRequestProto request) { return request.getFragmentSpec().getTaskAttemptIdString(); } + + public long getFirstAttemptStartTime() { + return request.getFragmentRuntimeInfo().getFirstAttemptStartTime(); + } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 740a2ca..f0e53a7 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -80,7 +80,7 @@ public void setup() { conf = new Configuration(); } - private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism) { + private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int attemptStartTime) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -101,16 +101,22 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism .setTaskAttemptIdString(taId.toString()).build()).setAmHost("localhost") .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") .setContainerIdString("MockContainer_1").setUser("MockUser") - .setTokenIdentifier("MockToken_1").build(); + .setTokenIdentifier("MockToken_1") + .setFragmentRuntimeInfo(LlapDaemonProtocolProtos + .FragmentRuntimeInfo + .newBuilder() + .setFirstAttemptStartTime(attemptStartTime) + .build()) + .build(); } @Test public void testWaitQueueComparator() throws InterruptedException { - MockRequest r1 = new MockRequest(createRequest(1, 2), false, 100000); - MockRequest r2 = new MockRequest(createRequest(2, 4), false, 100000); - MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000); - MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000); - MockRequest r5 = new MockRequest(createRequest(5, 10), false, 1000000); + MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); + MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); + MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); + MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); + MockRequest r5 = new MockRequest(createRequest(5, 10, 500), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -128,11 +134,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = new MockRequest(createRequest(1, 2), true, 100000); - r2 = new MockRequest(createRequest(2, 4), true, 100000); - r3 = new MockRequest(createRequest(3, 6), true, 1000000); - r4 = new MockRequest(createRequest(4, 8), true, 1000000); - r5 = new MockRequest(createRequest(5, 10), true, 1000000); + r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); + r2 = new MockRequest(createRequest(2, 4, 200), true, 100000); + r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); + r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000); + r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -150,11 +156,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - r1 = new MockRequest(createRequest(1, 1), true, 100000); - r2 = new MockRequest(createRequest(2, 1), false, 100000); - r3 = new MockRequest(createRequest(3, 1), true, 1000000); - r4 = new MockRequest(createRequest(4, 1), false, 1000000); - r5 = new MockRequest(createRequest(5, 10), true, 1000000); + r1 = new MockRequest(createRequest(1, 1, 100), true, 100000); + r2 = new MockRequest(createRequest(2, 1, 200), false, 100000); + r3 = new MockRequest(createRequest(3, 1, 300), true, 1000000); + r4 = new MockRequest(createRequest(4, 1, 400), false, 1000000); + r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -162,23 +168,21 @@ public void testWaitQueueComparator() throws InterruptedException { assertNull(queue.offer(r2)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r3)); - // same priority with r1 - assertEquals(r3, queue.peek()); - // same priority with r2 + assertEquals(r1, queue.peek()); assertNull(queue.offer(r4)); - assertEquals(r3, queue.peek()); + assertEquals(r1, queue.peek()); // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); - assertEquals(r3, queue.take()); + assertEquals(r4, queue.offer(r5)); assertEquals(r1, queue.take()); + assertEquals(r3, queue.take()); assertEquals(r5, queue.take()); - assertEquals(r4, queue.take()); + assertEquals(r2, queue.take()); - r1 = new MockRequest(createRequest(1, 2), true, 100000); - r2 = new MockRequest(createRequest(2, 4), false, 100000); - r3 = new MockRequest(createRequest(3, 6), true, 1000000); - r4 = new MockRequest(createRequest(4, 8), false, 1000000); - r5 = new MockRequest(createRequest(5, 10), true, 1000000); + r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); + r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); + r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); + r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); + r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -196,11 +200,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - r1 = new MockRequest(createRequest(1, 2), true, 100000); - r2 = new MockRequest(createRequest(2, 4), false, 100000); - r3 = new MockRequest(createRequest(3, 6), false, 1000000); - r4 = new MockRequest(createRequest(4, 8), false, 1000000); - r5 = new MockRequest(createRequest(5, 10), true, 1000000); + r1 = new MockRequest(createRequest(1, 2, 100), true, 100000); + r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); + r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); + r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); + r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -218,11 +222,11 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); - r1 = new MockRequest(createRequest(1, 2), false, 100000); - r2 = new MockRequest(createRequest(2, 4), true, 100000); - r3 = new MockRequest(createRequest(3, 6), true, 1000000); - r4 = new MockRequest(createRequest(4, 8), true, 1000000); - r5 = new MockRequest(createRequest(5, 10), true, 1000000); + r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); + r2 = new MockRequest(createRequest(2, 4, 200), true, 100000); + r3 = new MockRequest(createRequest(3, 6, 300), true, 1000000); + r4 = new MockRequest(createRequest(4, 8, 400), true, 1000000); + r5 = new MockRequest(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); assertNull(queue.offer(r1)); @@ -243,10 +247,10 @@ public void testWaitQueueComparator() throws InterruptedException { @Test public void testPreemptionQueueComparator() throws InterruptedException { - MockRequest r1 = new MockRequest(createRequest(1, 2), false, 100000); - MockRequest r2 = new MockRequest(createRequest(2, 4), false, 100000); - MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000); - MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000); + MockRequest r1 = new MockRequest(createRequest(1, 2, 100), false, 100000); + MockRequest r2 = new MockRequest(createRequest(2, 4, 200), false, 100000); + MockRequest r3 = new MockRequest(createRequest(3, 6, 300), false, 1000000); + MockRequest r4 = new MockRequest(createRequest(4, 8, 400), false, 1000000); BlockingQueue queue = new PriorityBlockingQueue(4, new TaskExecutorService.PreemptionQueueComparator()); queue.offer(r1); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 755f847..4e74a46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -75,14 +75,14 @@ public class TezJobMonitor { private static final String CLASS_NAME = TezJobMonitor.class.getName(); - private static final int MIN_TERMINAL_WIDTH = 92; + private static final int MIN_TERMINAL_WIDTH = 94; private static final int COLUMN_1_WIDTH = 16; private static final int SEPARATOR_WIDTH = MIN_TERMINAL_WIDTH; // keep this within 80 chars width. If more columns needs to be added then update min terminal // width requirement and separator width accordingly - private static final String HEADER_FORMAT = "%16s%10s %11s %5s %9s %7s %7s %6s %6s "; - private static final String VERTEX_FORMAT = "%-16s%10s %11s %5s %9s %7s %7s %6s %6s "; + private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s "; + private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s "; private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; private static final String HEADER = String.format(HEADER_FORMAT, "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");