diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java new file mode 100644 index 0000000..9f580f6 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +public interface SchedulerFragmentCompletingListener { + + enum State { + SUCCESS, FAILED, KILLED + } + + /** + * Indicates that a fragment is about to complete. + * @param fragmentId + */ + void fragmentCompleting(String fragmentId, State state); + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 8c33fa2..6908138 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; @@ -93,6 +94,7 @@ private final AMReporter amReporter; private final QueryTracker queryTracker; private final Scheduler executorService; + private final SchedulerFragmentCompletingListener completionListener; private final AtomicReference localAddress; private final AtomicReference localShufflePort; private final Map localEnv = new HashMap<>(); @@ -129,6 +131,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName, enablePreemption, classLoader, metrics); + completionListener = (SchedulerFragmentCompletingListener) executorService; addIfService(executorService); @@ -251,7 +254,8 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi); + this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi, + completionListener); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index adc86ea..a80bb9b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -36,27 +36,28 @@ private final PriorityBlockingDeque deque; private final Comparator comparator; + private final int waitQueueSize; + + private int currentSize = 0; public EvictingPriorityBlockingQueue(Comparator comparator, int maxSize) { - this.deque = new PriorityBlockingDeque<>(comparator, maxSize); + this.deque = new PriorityBlockingDeque<>(comparator); + this.waitQueueSize = maxSize; this.comparator = comparator; } - public synchronized E offer(E e) { - if (deque.offer(e)) { + public synchronized E offer(E e, int additionalElementsAllowed) { + if (currentSize < waitQueueSize + additionalElementsAllowed) { + // Capacity exists. + offerToDequeueInternal(e); + currentSize++; return null; } else { + // No capacity. Check if an element needs to be evicted. E last = deque.peekLast(); if (comparator.compare(e, last) < 0) { deque.removeLast(); - if (!deque.offer(e)) { - LOG.error( - "Failed to insert element into queue with capacity available. size={}, element={}", - size(), e); - throw new RuntimeException( - "Failed to insert element into queue with capacity available. size=" + - size()); - } + offerToDequeueInternal(e); return last; } return e; @@ -64,7 +65,7 @@ public synchronized E offer(E e) { } public synchronized boolean isEmpty() { - return deque.isEmpty(); + return currentSize == 0; } public synchronized E peek() { @@ -72,19 +73,55 @@ public synchronized E peek() { } public synchronized E take() throws InterruptedException { - return deque.take(); + E e = deque.take(); + currentSize--; // Decrement only if an element was removed. + return e; } public synchronized boolean remove(E e) { - return deque.remove(e); + boolean removed = deque.remove(e); + if (removed) { + currentSize--; + } + return removed; + } + + /** + * Re-insert an element if it exists (mainly to force a re-order) + * @param e + * @return false if the element was not found. true otherwise. + */ + public synchronized boolean reinsertIfExists(E e) { + if (remove(e)) { + offerToDequeueInternal(e); + currentSize++; + return true; + } else { + return false; + } + } + + private void offerToDequeueInternal(E e) { + boolean result = deque.offer(e); + if (!result) { + LOG.error( + "Failed to insert element into queue with capacity available. size={}, element={}", + size(), e); + throw new RuntimeException( + "Failed to insert element into queue with capacity available. size=" + + size()); + } } public synchronized int size() { - return deque.size(); + return currentSize; } @Override public synchronized String toString() { - return deque.toString(); + StringBuilder sb = new StringBuilder(); + sb.append("currentSize=").append(size()).append(", queue=") + .append(deque.toString()); + return sb.toString(); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 39b4b0e..2fe1017 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezException; @@ -82,15 +83,19 @@ private final String containerIdStr; private final String fragmentId; private final TezEvent initialEvent; + private final SchedulerFragmentCompletingListener completionListener; + // The same id as reported by TaskRunnerCallable.getRequestId + private final String fragmentRequestId; private final ListeningExecutorService heartbeatExecutor; @VisibleForTesting HeartbeatCallable currentCallable; - public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval, + public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, - String containerIdStr, final String fragmentId, TezEvent initialEvent) { + String containerIdStr, final String fragmentId, TezEvent initialEvent, + String fragmentRequestId) { this.umbilical = umbilical; this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; @@ -102,6 +107,8 @@ public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("TaskHeartbeatThread").build()); heartbeatExecutor = MoreExecutors.listeningDecorator(executor); + this.completionListener = completionListener; + this.fragmentRequestId = fragmentRequestId; } /** @@ -113,8 +120,8 @@ public synchronized void registerTask(RuntimeTask task, TezCounters tezCounters = task.addAndGetTezCounter(fragmentId); FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters); LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName()); - currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, - maxEventsToGet, requestCounter, containerIdStr, initialEvent); + currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval, + maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId); ListenableFuture future = heartbeatExecutor.submit(currentCallable); Futures.addCallback(future, new HeartbeatCallback(errorReporter)); } @@ -144,6 +151,8 @@ public void shutdown() { private final RuntimeTask task; private final EventMetaData updateEventMetadata; + private final SchedulerFragmentCompletingListener completionListener; + private final String fragmentRequestId; private final LlapTaskUmbilicalProtocol umbilical; @@ -174,9 +183,12 @@ public void shutdown() { private int prevCounterSendHeartbeatNum = 0; private TezEvent initialEvent; - public HeartbeatCallable(RuntimeTask task, LlapTaskUmbilicalProtocol umbilical, + public HeartbeatCallable( + SchedulerFragmentCompletingListener completionListener, + RuntimeTask task, LlapTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, int maxEventsToGet, - AtomicLong requestCounter, String containerIdStr, TezEvent initialEvent) { + AtomicLong requestCounter, String containerIdStr, + TezEvent initialEvent, String fragmentRequestId) { this.pollInterval = amPollInterval; this.sendCounterInterval = sendCounterInterval; @@ -184,6 +196,8 @@ public HeartbeatCallable(RuntimeTask task, LlapTaskUmbilicalProtocol umbilical, this.requestCounter = requestCounter; this.containerIdStr = containerIdStr; this.initialEvent = initialEvent; + this.completionListener = completionListener; + this.fragmentRequestId = fragmentRequestId; this.task = task; this.umbilical = umbilical; @@ -367,6 +381,10 @@ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), updateEventMetadata); + if (LOG.isDebugEnabled()) { + LOG.debug("Invoking OOB heartbeat for successful attempt: {}, isTaskDone={}", taskAttemptID, task.isTaskDone()); + } + completionListener.fragmentCompleting(fragmentRequestId, SchedulerFragmentCompletingListener.State.SUCCESS); return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; } else { LOG.warn("A final task state event has already been sent. Not sending again"); @@ -431,6 +449,14 @@ private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled, // Counter may exceed limitation LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out"); } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Invoking OOB heartbeat for failed/killed attempt: {}, isTaskDone={}, isKilled={}", + taskAttemptID, task.isTaskDone(), isKilled); + } + completionListener.fragmentCompleting(fragmentRequestId, + isKilled ? SchedulerFragmentCompletingListener.State.KILLED : + SchedulerFragmentCompletingListener.State.FAILED); return !heartbeat(tezEvents).shouldDie; } else { LOG.warn("A final task state event has already been sent. Not sending again"); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java index e27efa5..3bf51cd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java @@ -81,6 +81,10 @@ public PriorityBlockingDeque(int capacity) { this(null, capacity); } + public PriorityBlockingDeque(Comparator comparator) { + this(comparator, Integer.MAX_VALUE); + } + public PriorityBlockingDeque(Comparator comparator, int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 58863af..cae0b06 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -42,10 +42,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -78,7 +81,8 @@ * Task executor service can be shut down which will terminated all running tasks and reject all * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ -public class TaskExecutorService extends AbstractService implements Scheduler { +public class TaskExecutorService extends AbstractService + implements Scheduler, SchedulerFragmentCompletingListener { private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -89,7 +93,8 @@ // Thread pool for actual execution of work. private final ListeningExecutorService executorService; - private final EvictingPriorityBlockingQueue waitQueue; + @VisibleForTesting + final EvictingPriorityBlockingQueue waitQueue; // Thread pool for taking entities off the wait queue. private final ListeningExecutorService waitQueueExecutorService; // Thread pool for callbacks on completion of execution of a work unit. @@ -100,6 +105,13 @@ private final boolean enablePreemption; private final ThreadPoolExecutor threadPoolExecutor; private final AtomicInteger numSlotsAvailable; + private final int maxParallelExecutors; + private final Clock clock = new MonotonicClock(); + + // Tracks running fragments, and completing fragments. + // Completing since we have a race in the AM being notified and the task actually + // falling off, and the executor service being ready to schedule a new task. + private final AtomicInteger runningFragmentCount = new AtomicInteger(0); @VisibleForTesting @@ -121,6 +133,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, final Comparator waitQueueComparator = createComparator( waitQueueComparatorClassName); + this.maxParallelExecutors = numExecutors; this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size @@ -344,8 +357,12 @@ public SubmissionState schedule(TaskRunnerCallable task) { // TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks. // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots) + LOG.info( + "DBG: Offering to wait queue with: waitQueueSize={}, numSlotsAvailable={}, runningFragmentCount={} ", + waitQueue.size(), numSlotsAvailable.get(), + runningFragmentCount.get()); canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - evictedTask = waitQueue.offer(taskWrapper); + evictedTask = waitQueue.offer(taskWrapper, maxParallelExecutors - runningFragmentCount.get()); // Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable // null evicted task means offer accepted @@ -366,7 +383,9 @@ public SubmissionState schedule(TaskRunnerCallable task) { } } else { if (isInfoEnabled) { - LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId()); + LOG.info( + "wait queue full, size={}. numSlotsAvailable={}. {} not added", + waitQueue.size(), numSlotsAvailable.get(), task.getRequestId()); } evictedTask.getTaskRunnerCallable().killTask(); @@ -473,6 +492,34 @@ public void killFragment(String fragmentId) { } } + private static final class FragmentCompletion { + + public FragmentCompletion( + State state, long completingTime) { + this.state = state; + this.completingTime = completingTime; + } + + State state; + long completingTime; + } + + @VisibleForTesting + final ConcurrentMap + completingFragmentMap = new ConcurrentHashMap<>(); + + @Override + public void fragmentCompleting(String fragmentId, State state) { + int count = runningFragmentCount.decrementAndGet(); + if (count < 0) { + LOG.warn( + "RunningFragmentCount went negative. Multiple calls for the same completion. Resetting to 0"); + runningFragmentCount.set(0); + } + completingFragmentMap + .put(fragmentId, new FragmentCompletion(state, clock.getTime())); + } + @VisibleForTesting void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { @@ -481,6 +528,7 @@ void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionExceptio LOG.info("Attempting to execute {}", taskWrapper); ListenableFuture future = executorService.submit( taskWrapper.getTaskRunnerCallable()); + runningFragmentCount.incrementAndGet(); taskWrapper.setIsInWaitQueue(false); FutureCallback wrappedCallback = createInternalCompletionListener( taskWrapper); @@ -547,10 +595,8 @@ private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishab // Re-order the wait queue LOG.debug("Re-ordering the wait queue since {} finishable state moved to {}", taskWrapper.getRequestId(), newFinishableState); - if (waitQueue.remove(taskWrapper)) { - // Put element back only if it existed. - waitQueue.offer(taskWrapper); - } else { + boolean reInserted = waitQueue.reinsertIfExists(taskWrapper); + if (!reInserted) { LOG.warn("Failed to remove {} from waitQueue", taskWrapper.getTaskRunnerCallable().getRequestId()); } @@ -653,6 +699,8 @@ public InternalCompletionListener(TaskWrapper taskWrapper) { @Override public void onSuccess(TaskRunner2Result result) { + LOG.info("DBG: Received successful completion for: {}", taskWrapper.getRequestId()); + updateFallOffStats(taskWrapper.getRequestId()); knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); @@ -662,6 +710,8 @@ public void onSuccess(TaskRunner2Result result) { @Override public void onFailure(Throwable t) { + LOG.info("DBG: Received failed completion for: {}", taskWrapper.getRequestId()); + updateFallOffStats(taskWrapper.getRequestId()); knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); @@ -698,6 +748,44 @@ private void updatePreemptionListAndNotify(EndReason reason) { } } + private void updateFallOffStats( + String requestId) { + long now = clock.getTime(); + FragmentCompletion fragmentCompletion = + completingFragmentMap.remove(requestId); + if (fragmentCompletion == null) { + LOG.warn( + "Received onSuccess/onFailure for a fragment for which a completing message was not received: {}", + requestId); + // Happens due to AM side pre-emption, or the AM asking for a task to die. + // There's no hooks at the moment to get information over. + // For now - decrement the count to avoid accounting errors. + runningFragmentCount.decrementAndGet(); + // TODO: Extend TaskRunner2 or see if an API with callbacks will work + } else { + long timeTaken = now - fragmentCompletion.completingTime; + LOG.info("DBG: FallOff time taken for {} in state {} = {}ms", requestId, + fragmentCompletion.state, timeTaken); + switch (fragmentCompletion.state) { + case SUCCESS: + if (metrics != null) { + metrics.addMetricsFallOffSuccessTimeLost(timeTaken); + } + break; + case FAILED: + if (metrics != null) { + metrics.addMetricsFallOffFailedTimeLost(timeTaken); + } + break; + case KILLED: + if (metrics != null) { + metrics.addMetricsFallOffKilledTimeLost(timeTaken); + } + break; + } + } + } + } public void shutDown(boolean awaitTermination) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index bfb155a..4b677aa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; @@ -114,16 +115,17 @@ private final AtomicBoolean killInvoked = new AtomicBoolean(false); private final SignableVertexSpec vertex; private final TezEvent initialEvent; + private final SchedulerFragmentCompletingListener completionListener; private UserGroupInformation taskUgi; @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, - Configuration conf, ExecutionContext executionContext, Map envMap, - Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams, - LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, - FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, - TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, - UserGroupInformation taskUgi) { + Configuration conf, ExecutionContext executionContext, Map envMap, + Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams, + LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, + FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, + TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, + UserGroupInformation taskUgi, SchedulerFragmentCompletingListener completionListener) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -152,6 +154,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag this.tezHadoopShim = tezHadoopShim; this.initialEvent = initialEvent; this.taskUgi = taskUgi; + this.completionListener = completionListener; } public long getStartTime() { @@ -219,6 +222,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString()); taskReporter = new LlapTaskReporter( + completionListener, umbilical, confParams.amHeartbeatIntervalMsMax, confParams.amCounterHeartbeatInterval, @@ -226,7 +230,8 @@ public LlapTaskUmbilicalProtocol run() throws Exception { new AtomicLong(0), request.getContainerIdString(), fragmentId, - initialEvent); + initialEvent, + requestId); String attemptId = fragmentInfo.getFragmentIdentifierString(); IOContextMap.setThreadAttemptId(attemptId); @@ -297,9 +302,14 @@ public void killTask() { killtimerWatch.start(); LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID()); boolean killed = taskRunner.killTask(); + if (killed) { // Sending a kill message to the AM right here. Don't need to wait for the task to complete. LOG.info("Kill request for task {} completed. Informing AM", ta); + // Inform the scheduler that this fragment has been killed. + // If the kill failed - that means the task has already hit a final condition, + // and a notification comes from the LlapTaskReporter + completionListener.fragmentCompleting(getRequestId(), SchedulerFragmentCompletingListener.State.KILLED); reportTaskKilled(); } else { LOG.info("Kill request for task {} did not complete because the task is already complete", diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index db5fd4f..1e3e1d5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -51,7 +51,15 @@ ExecutorPercentileTimeLost("Percentile cluster time wasted due to pre-emption"), ExecutorMaxPreemptionTimeToKill("Max time for killing pre-empted task"), ExecutorMaxPreemptionTimeLost("Max cluster time lost due to pre-emption"), - ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority"); + ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority"), + ExecutorFallOffSuccessTimeLost("Total time lost in an executor completing after informing the AM - successful fragments"), + ExecutorFallOffSuccessMaxTimeLost("Max value of time lost in an executor completing after informing the AN - successful fragments"), + ExecutorFallOffFailedTimeLost("Total time lost in an executor completing after informing the AM - failed fragments"), + ExecutorFallOffFailedMaxTimeLost("Max value of time lost in an executor completing after informing the AN - failed fragments"), + ExecutorFallOffKilledTimeLost("Total time lost in an executor completing after informing the AM - killed fragments"), + ExecutorFallOffKilledMaxTimeLost("Max value of time lost in an executor completing after informing the AN - killed fragments"), + ExecutorFallOffNumCompletedFragments("Number of completed fragments w.r.t falloff values"), + ; private final String desc; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 92c8913..7a0ecc9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlotsPercent; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffNumCompletedFragments; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlots; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost; @@ -41,6 +42,12 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeLost; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffSuccessTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffSuccessMaxTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedMaxTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledMaxTimeLost; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -83,6 +90,10 @@ private long maxTimeLost = Long.MIN_VALUE; private long maxTimeToKill = Long.MIN_VALUE; + private long fallOffMaxSuccessTimeLostLong = 0L; + private long fallOffMaxFailedTimeLostLong = 0L; + private long fallOffMaxKilledTimeLostLong = 0L; + private final Map executorNames; final MutableGaugeLong[] executorThreadCpuTime; @@ -126,6 +137,23 @@ @Metric final MutableQuantiles[] percentileTimeLost; + @Metric + MutableCounterLong fallOffNumCompletedFragments; + @Metric + MutableCounterLong fallOffSuccessTimeLost; + @Metric + MutableCounterLong fallOffFailedTimeLost; + @Metric + MutableCounterLong fallOffKilledTimeLost; + @Metric + MutableGaugeLong fallOffMaxSuccessTimeLost; + @Metric + MutableGaugeLong fallOffMaxFailedTimeLost; + @Metric + MutableGaugeLong fallOffMaxKilledTimeLost; + + + private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, int numExecutors, final int[] intervals) { this.name = displayName; @@ -244,6 +272,33 @@ public void addMetricsPreemptionTimeToKill(long value) { } } + public void addMetricsFallOffSuccessTimeLost(long timeLost) { + fallOffNumCompletedFragments.incr(); + fallOffSuccessTimeLost.incr(timeLost); + if (timeLost > fallOffMaxSuccessTimeLostLong) { + fallOffMaxSuccessTimeLostLong = timeLost; + fallOffMaxSuccessTimeLost.set(timeLost); + } + } + + public void addMetricsFallOffFailedTimeLost(long timeLost) { + fallOffNumCompletedFragments.incr(); + fallOffFailedTimeLost.incr(timeLost); + if (timeLost > fallOffMaxFailedTimeLostLong) { + fallOffMaxFailedTimeLostLong = timeLost; + fallOffMaxFailedTimeLost.set(timeLost); + } + } + + public void addMetricsFallOffKilledTimeLost(long timeLost) { + fallOffNumCompletedFragments.incr(); + fallOffKilledTimeLost.incr(timeLost); + if (timeLost > fallOffMaxKilledTimeLostLong) { + fallOffMaxKilledTimeLostLong = timeLost; + fallOffMaxKilledTimeLost.set(timeLost); + } + } + public void incrExecutorTotalKilled() { executorTotalIKilled.incr(); } @@ -292,7 +347,14 @@ private void getExecutorStats(MetricsRecordBuilder rb) { .addCounter(ExecutorTotalPreemptionTimeToKill, totalPreemptionTimeToKill.value()) .addCounter(ExecutorTotalPreemptionTimeLost, totalPreemptionTimeLost.value()) .addGauge(ExecutorMaxPreemptionTimeToKill, maxPreemptionTimeToKill.value()) - .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value()); + .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value()) + .addCounter(ExecutorFallOffSuccessTimeLost, fallOffSuccessTimeLost.value()) + .addGauge(ExecutorFallOffSuccessMaxTimeLost, fallOffMaxSuccessTimeLost.value()) + .addCounter(ExecutorFallOffFailedTimeLost, fallOffFailedTimeLost.value()) + .addGauge(ExecutorFallOffFailedMaxTimeLost, fallOffMaxFailedTimeLost.value()) + .addCounter(ExecutorFallOffKilledTimeLost, fallOffKilledTimeLost.value()) + .addGauge(ExecutorFallOffKilledMaxTimeLost, fallOffMaxKilledTimeLost.value()) + .addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value()); for (MutableQuantiles q : percentileTimeToKill) { q.snapshot(rb, true); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 6506d07..5dc1be5 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; @@ -54,6 +55,18 @@ public static MockRequest createMockRequest(int fragmentNum, int parallelism, lo return createMockRequest(canFinish, workTime, request); } + public static MockRequest createMockRequest(int fragmentNum, int parallelism, + int withinDagPriority, + long firstAttemptStartTime, + long currentAttemptStartTime, + boolean canFinish, + long workTime) { + SubmitWorkRequestProto + request = createSubmitWorkRequestProto(fragmentNum, parallelism, 0, + firstAttemptStartTime, currentAttemptStartTime, withinDagPriority); + return createMockRequest(canFinish, workTime, request); + } + private static MockRequest createMockRequest(boolean canFinish, long workTime, SubmitWorkRequestProto request) { QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo( @@ -170,7 +183,8 @@ public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragme LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( FragmentCompletionHandler.class), new DefaultHadoopShim(), null, - requestProto.getWorkSpec().getVertex(), initialEvent, null); + requestProto.getWorkSpec().getVertex(), initialEvent, null, mock( + SchedulerFragmentCompletingListener.class)); this.workTime = workTime; this.canFinish = canFinish; } @@ -285,4 +299,5 @@ private static void logInfo(String message) { logInfo(message, null); } + } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java new file mode 100644 index 0000000..62407b5 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Comparator; + +import org.junit.Test; + +public class TestEvictingPriorityBlockingQueue { + + @Test (timeout = 10000) + public void test() throws InterruptedException { + Element e; + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>(new ElementComparator(), 3); + + Element[] elements = new Element[10]; + for (int i = 0 ; i < elements.length ; i++) { + elements[i] = new Element(i); + } + + assertNull(queue.offer(elements[0], 0)); + assertNull(queue.offer(elements[1], 0)); + assertNull(queue.offer(elements[2], 0)); + e = queue.offer(elements[3], 0); + assertEquals(elements[0], e); + + e = queue.offer(elements[0], 0); + assertEquals(elements[0], e); + // 1,2,3 + + e = queue.offer(elements[4], 0); + assertEquals(elements[1], e); + //2,3,4 + + e = queue.offer(elements[1], 1); + assertNull(e); + assertEquals(4, queue.size()); + // 1,2,3,4 + + e = queue.take(); + assertEquals(elements[4], e); //Highest priority at this point should have come out. + //1,2,3 + + e = queue.offer(elements[4], 1); + assertNull(e); + //1,2,3,4 + + e = queue.offer(elements[0], 1); + assertEquals(elements[0], e); // Rejected + //1,2,3,4 + + assertTrue(queue.reinsertIfExists(elements[2])); + assertEquals(4, queue.size()); + + assertFalse(queue.reinsertIfExists(elements[5])); + assertEquals(4, queue.size()); + + //1,2,3,4 + + e = queue.offer(elements[5], 1); + assertEquals(elements[1], e); + //2,3,4,5 + + assertNull(queue.offer(elements[1], 2)); + + assertNull(queue.offer(elements[6], 5)); + assertNull(queue.offer(elements[7], 5)); + //1,2,3,4,5,6,7 + + assertEquals(7, queue.size()); + } + + private static class Element { + public Element(int x) { + this.x = x; + } + + int x; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Element element = (Element) o; + + return x == element.x; + } + + @Override + public int hashCode() { + return x; + } + } + + private static class ElementComparator implements Comparator { + + @Override + public int compare(Element o1, + Element o2) { + return o2.x - o1.x; + } + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index de7f2fc..bf7d1d8 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.MockRequest; import org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator; @@ -174,12 +175,65 @@ public void testPreemptionStateOnTaskMoveToNonFinishableState() throws Interrupt } } + // Tests wait queue behaviour for fragments which have reported to the AM, but have not given up their executor slot. + @Test (timeout = 10000) + public void testWaitQueueAcceptAfterAMTaskReport() throws + InterruptedException { + + TaskExecutorServiceForTest taskExecutorService = + new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); + + // Fourth is lower priority as a result of canFinish being set to false. + MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l); + MockRequest r2 = createMockRequest(2, 1, 1, 200, 2000, true, 20000l); + MockRequest r3 = createMockRequest(3, 1, 2, 300, 420, true, 20000l); + MockRequest r4 = createMockRequest(4, 1, 3, 400, 510, false, 20000l); + + taskExecutorService.init(new Configuration()); + taskExecutorService.start(); + try { + Scheduler.SubmissionState submissionState; + submissionState = taskExecutorService.schedule(r1); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + r1.awaitStart(); + + submissionState = taskExecutorService.schedule(r2); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + + submissionState = taskExecutorService.schedule(r3); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + + submissionState = taskExecutorService.schedule(r4); + assertEquals(Scheduler.SubmissionState.REJECTED, submissionState); + + // Mark a fragment as completing, but don't actually complete it yet. + // The wait queue should now have capacity to accept one more fragment. + taskExecutorService.fragmentCompleting(r1.getRequestId(), + SchedulerFragmentCompletingListener.State.SUCCESS); + + submissionState = taskExecutorService.schedule(r4); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + + assertEquals(3, taskExecutorService.waitQueue.size()); + assertEquals(1, taskExecutorService.completingFragmentMap.size()); + + r1.complete(); + r1.awaitEnd(); + // r2 can only start once 1 fragment has completed. the map should be clear at this point. + awaitStartAndSchedulerRun(r2, taskExecutorService); + assertEquals(0, taskExecutorService.completingFragmentMap.size()); + + } finally { + taskExecutorService.shutDown(false); + } + } + @Test(timeout = 10000) public void testWaitQueuePreemption() throws InterruptedException { MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l); - MockRequest r2 = createMockRequest(2, 1, 200, 330, false, 20000l); - MockRequest r3 = createMockRequest(3, 1, 300, 420, false, 20000l); - MockRequest r4 = createMockRequest(4, 1, 400, 510, false, 20000l); + MockRequest r2 = createMockRequest(2, 1, 1,200, 330, false, 20000l); + MockRequest r3 = createMockRequest(3, 2, 2,300, 420, false, 20000l); + MockRequest r4 = createMockRequest(4, 1, 3,400, 510, false, 20000l); MockRequest r5 = createMockRequest(5, 1, 500, 610, true, 20000l); TaskExecutorServiceForTest taskExecutorService = @@ -190,8 +244,7 @@ public void testWaitQueuePreemption() throws InterruptedException { try { taskExecutorService.schedule(r1); - // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots) - // This currently serves to allow the task to be removed from the waitQueue. + // 1 scheduling run will happen, which may or may not pick up this task in the test.. awaitStartAndSchedulerRun(r1, taskExecutorService); Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2); assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index 8cce0cb..79c2564 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -107,16 +107,16 @@ public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // this offer will be accepted and r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -129,16 +129,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // this offer will be accpeted and r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -151,16 +151,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r3, queue.peek()); // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); + assertEquals(r2, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r3, queue.take()); assertEquals(r1, queue.take()); @@ -173,16 +173,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r3, queue.peek()); // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); + assertEquals(r2, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r3, queue.take()); assertEquals(r1, queue.take()); @@ -195,16 +195,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); + assertEquals(r2, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r1, queue.take()); assertEquals(r4, queue.take()); @@ -217,16 +217,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -239,16 +239,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r4, queue.peek()); // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); assertEquals(r3, queue.take()); @@ -264,9 +264,9 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -282,13 +282,13 @@ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedExc EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 3); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); // can not queue more requests as queue is full TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000); - assertEquals(r4, queue.offer(r4)); + assertEquals(r4, queue.offer(r4, 0)); } @Test(timeout = 60000) @@ -300,9 +300,9 @@ public void testWaitQueueComparatorParallelism() throws InterruptedException { EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index 0059d0c..b348bd6 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -35,16 +35,16 @@ public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // this offer will be rejected - assertEquals(r5, queue.offer(r5)); + assertEquals(r5, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -57,16 +57,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // this offer will be rejected - assertEquals(r5, queue.offer(r5)); + assertEquals(r5, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -79,16 +79,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); + assertEquals(r4, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r3, queue.take()); assertEquals(r5, queue.take()); @@ -101,16 +101,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); + assertEquals(r4, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r3, queue.take()); assertEquals(r5, queue.take()); @@ -123,16 +123,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r1, queue.peek()); // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); + assertEquals(r4, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); @@ -145,16 +145,16 @@ public void testWaitQueueComparator() throws InterruptedException { r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000); queue = new EvictingPriorityBlockingQueue( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); + assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); + assertNull(queue.offer(r2, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.peek()); - assertNull(queue.offer(r4)); + assertNull(queue.offer(r4, 0)); assertEquals(r2, queue.peek()); // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); + assertEquals(r1, queue.offer(r5, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); @@ -170,9 +170,9 @@ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedExcepti EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -188,13 +188,13 @@ public void testWaitQueueComparatorWithinSameDagPriority() throws InterruptedExc EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 3); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); // can not queue more requests as queue is full TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000); - assertEquals(r4, queue.offer(r4)); + assertEquals(r4, queue.offer(r4, 0)); } @Test(timeout = 60000) @@ -206,9 +206,9 @@ public void testWaitQueueComparatorParallelism() throws InterruptedException { EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -224,9 +224,9 @@ public void testWaitQueueComparatorAging() throws InterruptedException { EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); @@ -241,9 +241,9 @@ public void testWaitQueueComparatorAging() throws InterruptedException { queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); + assertNull(queue.offer(r1, 0)); + assertNull(queue.offer(r2, 0)); + assertNull(queue.offer(r3, 0)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take());