diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java new file mode 100644 index 0000000..19e1379 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +public interface SchedulerFragmentCompletingListener { + + /** + * Indicates that a fragment is about to complete. + * @param fragmentId + */ + void fragmentCompleting(String fragmentId); + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 8c33fa2..6908138 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; @@ -93,6 +94,7 @@ private final AMReporter amReporter; private final QueryTracker queryTracker; private final Scheduler executorService; + private final SchedulerFragmentCompletingListener completionListener; private final AtomicReference localAddress; private final AtomicReference localShufflePort; private final Map localEnv = new HashMap<>(); @@ -129,6 +131,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName, enablePreemption, classLoader, metrics); + completionListener = (SchedulerFragmentCompletingListener) executorService; addIfService(executorService); @@ -251,7 +254,8 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi); + this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi, + completionListener); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index adc86ea..ac966a0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -36,16 +36,26 @@ private final PriorityBlockingDeque deque; private final Comparator comparator; + private final int waitQueueSize; + + private int currentSize = 0; public EvictingPriorityBlockingQueue(Comparator comparator, int maxSize) { - this.deque = new PriorityBlockingDeque<>(comparator, maxSize); + this.deque = new PriorityBlockingDeque<>(comparator); + this.waitQueueSize = maxSize; this.comparator = comparator; } - public synchronized E offer(E e) { - if (deque.offer(e)) { + // TODO: Write a test for this. + + public synchronized E offer(E e, int additionalElementsAllowed) { + if (currentSize < waitQueueSize + additionalElementsAllowed) { + // Capacity exists. + deque.offer(e); + currentSize++; return null; } else { + // No capacity. Check if an element needs to be evicted. E last = deque.peekLast(); if (comparator.compare(e, last) < 0) { deque.removeLast(); @@ -64,7 +74,7 @@ public synchronized E offer(E e) { } public synchronized boolean isEmpty() { - return deque.isEmpty(); + return currentSize == 0; } public synchronized E peek() { @@ -72,19 +82,49 @@ public synchronized E peek() { } public synchronized E take() throws InterruptedException { - return deque.take(); + E e = deque.take(); + currentSize--; // Decrement only if an element was removed. + return e; } public synchronized boolean remove(E e) { - return deque.remove(e); + boolean removed = deque.remove(e); + if (removed) { + currentSize--; + } + return removed; + } + + /** + * Re-insert an element if it exists (mainly to force a re-order) + * @param e + * @return false if the element was not found. true otherwise. + */ + public synchronized boolean reinsertIfExists(E e) { + if (remove(e)) { + if (deque.offer(e)) { + LOG.error( + "Failed to re-insert element into queue with capacity available. size={}, element={}", + size(), e); + throw new RuntimeException( + "Failed to re-insert element into queue with capacity available. size=" + + size()); + } + return true; + } else { + return false; + } } public synchronized int size() { - return deque.size(); + return currentSize; } @Override public synchronized String toString() { - return deque.toString(); + StringBuilder sb = new StringBuilder(); + sb.append("currentSize=").append(size()).append(", queue=") + .append(deque.toString()); + return sb.toString(); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 39b4b0e..1665fe5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; @@ -82,15 +83,19 @@ private final String containerIdStr; private final String fragmentId; private final TezEvent initialEvent; + private final SchedulerFragmentCompletingListener completionListener; + // The same id as reported by TaskRunnerCallable.getRequestId + private final String fragmentRequestId; private final ListeningExecutorService heartbeatExecutor; @VisibleForTesting HeartbeatCallable currentCallable; - public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval, + public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, - String containerIdStr, final String fragmentId, TezEvent initialEvent) { + String containerIdStr, final String fragmentId, TezEvent initialEvent, + String fragmentRequestId) { this.umbilical = umbilical; this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; @@ -102,6 +107,8 @@ public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("TaskHeartbeatThread").build()); heartbeatExecutor = MoreExecutors.listeningDecorator(executor); + this.completionListener = completionListener; + this.fragmentRequestId = fragmentRequestId; } /** @@ -113,8 +120,8 @@ public synchronized void registerTask(RuntimeTask task, TezCounters tezCounters = task.addAndGetTezCounter(fragmentId); FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters); LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName()); - currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, - maxEventsToGet, requestCounter, containerIdStr, initialEvent); + currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval, + maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId); ListenableFuture future = heartbeatExecutor.submit(currentCallable); Futures.addCallback(future, new HeartbeatCallback(errorReporter)); } @@ -144,6 +151,8 @@ public void shutdown() { private final RuntimeTask task; private final EventMetaData updateEventMetadata; + private final SchedulerFragmentCompletingListener completionListener; + private final String fragmentRequestId; private final LlapTaskUmbilicalProtocol umbilical; @@ -174,9 +183,12 @@ public void shutdown() { private int prevCounterSendHeartbeatNum = 0; private TezEvent initialEvent; - public HeartbeatCallable(RuntimeTask task, LlapTaskUmbilicalProtocol umbilical, + public HeartbeatCallable( + SchedulerFragmentCompletingListener completionListener, + RuntimeTask task, LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, - AtomicLong requestCounter, String containerIdStr, TezEvent initialEvent) { + AtomicLong requestCounter, String containerIdStr, + TezEvent initialEvent, String fragmentRequestId) { this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; @@ -184,6 +196,8 @@ public HeartbeatCallable(RuntimeTask task, LlapTaskUmbilicalProtocol umbilical, this.requestCounter = requestCounter; this.containerIdStr = containerIdStr; this.initialEvent = initialEvent; + this.completionListener = completionListener; + this.fragmentRequestId = fragmentRequestId; this.task = task; this.umbilical = umbilical; @@ -367,6 +381,10 @@ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), updateEventMetadata); + if (LOG.isDebugEnabled()) { + LOG.debug("Invoking OOB heartbeat for successful attempt: {}, isTaskDone={}", taskAttemptID, task.isTaskDone()); + } + completionListener.fragmentCompleting(fragmentRequestId); return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; } else { LOG.warn("A final task state event has already been sent. Not sending again"); @@ -431,6 +449,12 @@ private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled, // Counter may exceed limitation LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out"); } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Invoking OOB heartbeat for failed/killed attempt: {}, isTaskDone={}, isKilled={}", + taskAttemptID, task.isTaskDone(), isKilled); + } + completionListener.fragmentCompleting(fragmentRequestId); return !heartbeat(tezEvents).shouldDie; } else { LOG.warn("A final task state event has already been sent. Not sending again"); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java index e27efa5..3bf51cd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java @@ -81,6 +81,10 @@ public PriorityBlockingDeque(int capacity) { this(null, capacity); } + public PriorityBlockingDeque(Comparator comparator) { + this(comparator, Integer.MAX_VALUE); + } + public PriorityBlockingDeque(Comparator comparator, int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 58863af..ddd64e9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -42,6 +42,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; @@ -78,7 +79,8 @@ * Task executor service can be shut down which will terminated all running tasks and reject all * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ -public class TaskExecutorService extends AbstractService implements Scheduler { +public class TaskExecutorService extends AbstractService + implements Scheduler, SchedulerFragmentCompletingListener { private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -100,6 +102,12 @@ private final boolean enablePreemption; private final ThreadPoolExecutor threadPoolExecutor; private final AtomicInteger numSlotsAvailable; + private final int maxParallelExecutors; + + // Tracks running fragments, and completing fragments. + // Completing since we have a race in the AM being notified and the task actually + // falling off, and the executor service being ready to schedule a new task. + private final AtomicInteger runningFragmentCount = new AtomicInteger(0); @VisibleForTesting @@ -121,6 +129,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, final Comparator waitQueueComparator = createComparator( waitQueueComparatorClassName); + this.maxParallelExecutors = numExecutors; this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size @@ -344,8 +353,12 @@ public SubmissionState schedule(TaskRunnerCallable task) { // TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks. // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots) + LOG.info( + "DBG: Offering to wait queue with: waitQueueSize={}, numSlotsAvailable={}, runningFragmentCount={} ", + waitQueue.size(), numSlotsAvailable.get(), + runningFragmentCount.get()); canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - evictedTask = waitQueue.offer(taskWrapper); + evictedTask = waitQueue.offer(taskWrapper, maxParallelExecutors - runningFragmentCount.get()); // Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable // null evicted task means offer accepted @@ -366,7 +379,9 @@ public SubmissionState schedule(TaskRunnerCallable task) { } } else { if (isInfoEnabled) { - LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId()); + LOG.info( + "wait queue full, size={}. numSlotsAvailable={}. {} not added", + waitQueue.size(), numSlotsAvailable.get(), task.getRequestId()); } evictedTask.getTaskRunnerCallable().killTask(); @@ -473,6 +488,11 @@ public void killFragment(String fragmentId) { } } + @Override + public void fragmentCompleting(String fragmentId) { + runningFragmentCount.decrementAndGet(); + } + @VisibleForTesting void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { @@ -481,6 +501,7 @@ void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionExceptio LOG.info("Attempting to execute {}", taskWrapper); ListenableFuture future = executorService.submit( taskWrapper.getTaskRunnerCallable()); + runningFragmentCount.incrementAndGet(); taskWrapper.setIsInWaitQueue(false); FutureCallback wrappedCallback = createInternalCompletionListener( taskWrapper); @@ -547,10 +568,8 @@ private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishab // Re-order the wait queue LOG.debug("Re-ordering the wait queue since {} finishable state moved to {}", taskWrapper.getRequestId(), newFinishableState); - if (waitQueue.remove(taskWrapper)) { - // Put element back only if it existed. - waitQueue.offer(taskWrapper); - } else { + boolean reInserted = waitQueue.reinsertIfExists(taskWrapper); + if (!reInserted) { LOG.warn("Failed to remove {} from waitQueue", taskWrapper.getTaskRunnerCallable().getRequestId()); } @@ -653,6 +672,7 @@ public InternalCompletionListener(TaskWrapper taskWrapper) { @Override public void onSuccess(TaskRunner2Result result) { + LOG.info("DBG: Received successful completion for: {}", taskWrapper.getRequestId()); knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); @@ -662,6 +682,7 @@ public void onSuccess(TaskRunner2Result result) { @Override public void onFailure(Throwable t) { + LOG.info("DBG: Received failed completion for: {}", taskWrapper.getRequestId()); knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index bfb155a..2b3ab2e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; @@ -114,16 +115,17 @@ private final AtomicBoolean killInvoked = new AtomicBoolean(false); private final SignableVertexSpec vertex; private final TezEvent initialEvent; + private final SchedulerFragmentCompletingListener completionListener; private UserGroupInformation taskUgi; @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, - Configuration conf, ExecutionContext executionContext, Map envMap, - Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams, - LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, - FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, - TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, - UserGroupInformation taskUgi) { + Configuration conf, ExecutionContext executionContext, Map envMap, + Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams, + LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, + FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, + TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, + UserGroupInformation taskUgi, SchedulerFragmentCompletingListener completionListener) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -152,6 +154,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag this.tezHadoopShim = tezHadoopShim; this.initialEvent = initialEvent; this.taskUgi = taskUgi; + this.completionListener = completionListener; } public long getStartTime() { @@ -219,6 +222,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString()); taskReporter = new LlapTaskReporter( + completionListener, umbilical, confParams.amHeartbeatIntervalMsMax, confParams.amCounterHeartbeatInterval, @@ -226,7 +230,8 @@ public LlapTaskUmbilicalProtocol run() throws Exception { new AtomicLong(0), request.getContainerIdString(), fragmentId, - initialEvent); + initialEvent, + requestId); String attemptId = fragmentInfo.getFragmentIdentifierString(); IOContextMap.setThreadAttemptId(attemptId); @@ -297,9 +302,14 @@ public void killTask() { killtimerWatch.start(); LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID()); boolean killed = taskRunner.killTask(); + if (killed) { // Sending a kill message to the AM right here. Don't need to wait for the task to complete. LOG.info("Kill request for task {} completed. Informing AM", ta); + // Inform the scheduler that this fragment has been killed. + // If the kill failed - that means the task has already hit a final condition, + // and a notification comes from the LlapTaskReporter + completionListener.fragmentCompleting(getRequestId()); reportTaskKilled(); } else { LOG.info("Kill request for task {} did not complete because the task is already complete", diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 6506d07..f6e1e3c 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; @@ -170,7 +171,8 @@ public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragme LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( FragmentCompletionHandler.class), new DefaultHadoopShim(), null, - requestProto.getWorkSpec().getVertex(), initialEvent, null); + requestProto.getWorkSpec().getVertex(), initialEvent, null, mock( + SchedulerFragmentCompletingListener.class)); this.workTime = workTime; this.canFinish = canFinish; } @@ -285,4 +287,5 @@ private static void logInfo(String message) { logInfo(message, null); } + } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index 8cce0cb..79c2564 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -107,16 +107,16 @@ public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // this offer will be accepted and r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -129,16 +129,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // this offer will be accpeted and r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -151,16 +151,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r3, queue.peek()); // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); + assertEquals(r2, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r3, queue.take()); assertEquals(r1, queue.take()); @@ -173,16 +173,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r3, queue.peek()); // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); + assertEquals(r2, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r3, queue.take()); assertEquals(r1, queue.take()); @@ -195,16 +195,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); + assertEquals(r2, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r1, queue.take()); assertEquals(r4, queue.take()); @@ -217,16 +217,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -239,16 +239,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -264,9 +264,9 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -282,13 +282,13 @@ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedExc EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 3); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); // can not queue more requests as queue is full TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000); - assertEquals(r4, queue.offer(r4)); + assertEquals(r4, queue.offer(r4, 0)); } @Test(timeout = 60000) @@ -300,9 +300,9 @@ public void testWaitQueueComparatorParallelism() throws InterruptedException { EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index 0059d0c..b348bd6 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -35,16 +35,16 @@ public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // this offer will be rejected - assertEquals(r5, queue.offer(r5)); + assertEquals(r5, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -57,16 +57,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // this offer will be rejected - assertEquals(r5, queue.offer(r5)); + assertEquals(r5, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -79,16 +79,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); + assertEquals(r4, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r3, queue.take()); assertEquals(r5, queue.take()); @@ -101,16 +101,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); + assertEquals(r4, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r3, queue.take()); assertEquals(r5, queue.take()); @@ -123,16 +123,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); + assertEquals(r4, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); @@ -145,16 +145,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r2, queue.peek()); // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); @@ -170,9 +170,9 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -188,13 +188,13 @@ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedExc EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 3); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); // can not queue more requests as queue is full TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000); - assertEquals(r4, queue.offer(r4)); + assertEquals(r4, queue.offer(r4, 0)); } @Test(timeout = 60000) @@ -206,9 +206,9 @@ public void testWaitQueueComparatorParallelism() throws InterruptedException { EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -224,9 +224,9 @@ public void testWaitQueueComparatorAging() throws InterruptedException { EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); @@ -241,9 +241,9 @@ public void testWaitQueueComparatorAging() throws InterruptedException { queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take());