diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 5099a5c..f99c05d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -38,6 +38,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; @@ -380,6 +381,7 @@ private boolean trySchedule(final TaskWrapper taskWrapper) { try { synchronized (lock) { boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); + LOG.info("Attempting to execute {}", taskWrapper); ListenableFuture future = executorService.submit(taskWrapper.getTaskRunnerCallable()); taskWrapper.setIsInWaitQueue(false); FutureCallback wrappedCallback = createInternalCompletionListener(taskWrapper); @@ -584,23 +586,41 @@ private void shutdownExecutor(ExecutorService executorService) { public int compare(TaskWrapper t1, TaskWrapper t2) { TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - boolean newCanFinish = o1.canFinish(); - boolean oldCanFinish = o2.canFinish(); - if (newCanFinish == true && oldCanFinish == false) { + boolean o1CanFinish = o1.canFinish(); + boolean o2CanFinish = o2.canFinish(); + if (o1CanFinish == true && o2CanFinish == false) { return -1; - } else if (newCanFinish == false && oldCanFinish == true) { + } else if (o1CanFinish == false && o2CanFinish == true) { return 1; } - if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + // Check if these belong to the same task, and work with withinDagPriority + if (o1.getQueryId().equals(o2.getQueryId())) { + // Same Query + // Within dag priority - lower values indicate higher priority. + if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { + return -1; + } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ + return 1; + } + } + + // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and + // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. + int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); + int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); + if (knownPending1 < knownPending2) { return -1; - } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + } else if (knownPending1 > knownPending2) { return 1; } - if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) { + if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { return -1; - } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) { + } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { return 1; } return 0; @@ -610,8 +630,9 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { // if map tasks and reduce tasks are in finishable state then priority is given to the task in // the following order // 1) Dag start time - // 2) Attempt start time - // 3) Vertex parallelism + // 2) Within dag priority + // 3) Attempt start time + // 4) Vertex parallelism @VisibleForTesting public static class FirstInFirstOutComparator implements Comparator { @@ -619,29 +640,47 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { public int compare(TaskWrapper t1, TaskWrapper t2) { TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - boolean newCanFinish = o1.canFinish(); - boolean oldCanFinish = o2.canFinish(); - if (newCanFinish == true && oldCanFinish == false) { + boolean o1CanFinish = o1.canFinish(); + boolean o2CanFinish = o2.canFinish(); + if (o1CanFinish == true && o2CanFinish == false) { return -1; - } else if (newCanFinish == false && oldCanFinish == true) { + } else if (o1CanFinish == false && o2CanFinish == true) { return 1; } - if (o1.getDagStartTime() < o2.getDagStartTime()) { + FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + if (fri1.getDagStartTime() < fri2.getDagStartTime()) { return -1; - } else if (o1.getDagStartTime() > o2.getDagStartTime()) { + } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) { return 1; } - if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) { + // Check if these belong to the same task, and work with withinDagPriority + if (o1.getQueryId().equals(o2.getQueryId())) { + // Same Query + // Within dag priority - lower values indicate higher priority. + if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { + return -1; + } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ + return 1; + } + } + + if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { return -1; - } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) { + } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { return 1; } - if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and + // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. + int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); + int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); + if (knownPending1 < knownPending2) { return -1; - } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + } else if (knownPending1 > knownPending2) { return 1; } @@ -656,9 +695,12 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { public int compare(TaskWrapper t1, TaskWrapper t2) { TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) { return 1; - } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + } else if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) { return -1; } return 0; @@ -738,8 +780,12 @@ public String toString() { ", inPreemptionQueue=" + inPreemptionQueue + ", registeredForNotifications=" + registeredForNotifications + ", canFinish=" + taskRunnerCallable.canFinish() + - ", firstAttemptStartTime=" + taskRunnerCallable.getFirstAttemptStartTime() + - ", vertexParallelism=" + taskRunnerCallable.getVertexParallelism() + + ", firstAttemptStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() + + ", dagStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() + + ", withinDagPriority=" + taskRunnerCallable.getFragmentRuntimeInfo().getWithinDagPriority() + + ", vertexParallelism= " + taskRunnerCallable.getFragmentSpec().getVertexParallelism() + + ", selfAndUpstreamParallelism= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() + + ", selfAndUpstreamComplete= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() + '}'; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 52f21d9..6ceb2e5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -96,6 +97,7 @@ private volatile String threadName; private final LlapDaemonExecutorMetrics metrics; private final String requestId; + private final String queryId; private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); @@ -129,7 +131,9 @@ request.getUser(), jobToken, null, request.getFragmentSpec().getDagName()); } this.metrics = metrics; - this.requestId = getRequestId(request); + this.requestId = request.getFragmentSpec().getFragmentIdentifierString(); + // TODO Change this to the queryId/Name when that's available. + this.queryId = request.getFragmentSpec().getDagName(); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; } @@ -330,8 +334,13 @@ public void shutdown() { @Override public String toString() { return requestId + " {canFinish: " + canFinish() + - " vertexParallelism: " + getVertexParallelism() + - " firstAttemptStartTime: " + getFirstAttemptStartTime() + "}"; + ", vertexParallelism: " + request.getFragmentSpec().getVertexParallelism() + + ", selfAndUpstreamParallelism: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() + + ", selfAndUpstreamComplete: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() + + ", firstAttemptStartTime: " + getFragmentRuntimeInfo().getFirstAttemptStartTime() + + ", dagStartTime:" + getFragmentRuntimeInfo().getDagStartTime() + + ", withinDagPriority: " + getFragmentRuntimeInfo().getWithinDagPriority() + + "}"; } @Override @@ -347,14 +356,14 @@ public boolean equals(Object obj) { return requestId.equals(((TaskRunnerCallable) obj).getRequestId()); } - public int getVertexParallelism() { - return request.getFragmentSpec().getVertexParallelism(); - } - public String getRequestId() { return requestId; } + public String getQueryId() { + return queryId; + } + public QueryFragmentInfo getFragmentInfo() { return fragmentInfo; } @@ -470,16 +479,11 @@ public static String getTaskIdentifierString( return sb.toString(); } - private static String getRequestId(SubmitWorkRequestProto request) { - return request.getFragmentSpec().getFragmentIdentifierString(); + public FragmentRuntimeInfo getFragmentRuntimeInfo() { + return request.getFragmentRuntimeInfo(); } - public long getFirstAttemptStartTime() { - return request.getFragmentRuntimeInfo().getFirstAttemptStartTime(); + public FragmentSpecProto getFragmentSpec() { + return request.getFragmentSpec(); } - - public long getDagStartTime() { - return request.getFragmentRuntimeInfo().getDagStartTime(); - } - } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 25f7a81..7a01b39 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -67,6 +67,7 @@ public void setup() { conf = new Configuration(); } + @Test(timeout = 5000) public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); @@ -203,6 +204,42 @@ public void testWaitQueueComparator() throws InterruptedException { } @Test(timeout = 5000) + public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorParallelism() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) public void testPreemptionQueueComparator() throws InterruptedException { TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); @@ -344,8 +381,15 @@ public void testWaitQueuePreemption() throws InterruptedException { // ----------- Helper classes and methods go after this point. Tests above this ----------- - private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int parallelism, + // Create requests with the same within dag priority + private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism, long attemptStartTime) { + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, attemptStartTime, 1); + } + + private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism, + int selfAndUpstreamComplete, + long attemptStartTime, int withinDagPriority) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -360,7 +404,6 @@ private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, .setDagName("MockDag") .setFragmentNumber(fragmentNumber) .setVertexName("MockVertex") - .setVertexParallelism(parallelism) .setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") @@ -371,6 +414,9 @@ private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, .FragmentRuntimeInfo .newBuilder() .setFirstAttemptStartTime(attemptStartTime) + .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism) + .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete) + .setWithinDagPriority(withinDagPriority) .build()) .build(); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java index ad2a15b..1929439 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java @@ -81,8 +81,15 @@ public void setup() { conf = new Configuration(); } - private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int dagStartTime, - int attemptStartTime) { + private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, + int attemptStartTime) { + // Same priority for all tasks. + return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1); + } + + private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, + int numSelfAndUpstreamComplete, int dagStartTime, + int attemptStartTime, int withinDagPriority) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -97,7 +104,6 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism .setDagName("MockDag") .setFragmentNumber(fragmentNumber) .setVertexName("MockVertex") - .setVertexParallelism(parallelism) .setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") @@ -109,6 +115,9 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism .newBuilder() .setDagStartTime(dagStartTime) .setFirstAttemptStartTime(attemptStartTime) + .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks) + .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete) + .setWithinDagPriority(withinDagPriority) .build()) .build(); } @@ -270,6 +279,43 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r2, queue.take()); } + @Test(timeout = 5000) + public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorParallelism() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) { MockRequest mockRequest = new MockRequest(request, canFinish, workTime); TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);