diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 8d9a98a..49ab59a 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -410,8 +410,8 @@ public void unregister() throws IOException { public DynamicServiceInstance(ServiceRecord srv) throws IOException { this.srv = srv; - if (LOG.isDebugEnabled()) { - LOG.debug("Working with ServiceRecord: {}", srv); + if (LOG.isTraceEnabled()) { + LOG.trace("Working with ServiceRecord: {}", srv); } final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index 4ea3b0b..adc86ea 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -19,6 +19,9 @@ import java.util.Comparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Bounded priority queue that evicts the last element based on priority order specified * through comparator. Elements that are added to the queue are sorted based on the specified @@ -27,6 +30,10 @@ * returned back. If the queue is not full, new element will be added to queue and null is returned. */ public class EvictingPriorityBlockingQueue { + + private static final Logger LOG = + LoggerFactory.getLogger(EvictingPriorityBlockingQueue.class); + private final PriorityBlockingDeque deque; private final Comparator comparator; @@ -42,7 +49,14 @@ public synchronized E offer(E e) { E last = deque.peekLast(); if (comparator.compare(e, last) < 0) { deque.removeLast(); - deque.offer(e); + if (!deque.offer(e)) { + LOG.error( + "Failed to insert element into queue with capacity available. size={}, element={}", + size(), e); + throw new RuntimeException( + "Failed to insert element into queue with capacity available. size=" + + size()); + } return last; } return e; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java index db2ab16..e27efa5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java @@ -107,7 +107,7 @@ private boolean innerAdd(E e) { } list.add(insertionPoint, e); - // Collections.sort(list, comparator); + // Inserted in sort order. Hence no explict sort. notEmpty.signal(); return true; @@ -178,6 +178,7 @@ public boolean offerFirst(E e) { /** * @throws NullPointerException {@inheritDoc} */ + @Override public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); lock.lock(); @@ -450,6 +451,7 @@ public boolean add(E e) { /** * @throws NullPointerException if the specified element is null */ + @Override public boolean offer(E e) { return offerLast(e); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 5c7d4ef..1080d3e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -203,11 +203,12 @@ boolean registerForUpdates(FinishableStateUpdateHandler handler, sourceToEntity.put(source, entityInfo); } - if (lastFinishableState != fragmentInfo.canFinish()) { + if (lastFinishableState == fragmentInfo.canFinish()) { + // State has not changed. + return true; + } else { entityInfo.setLastFinishableState(fragmentInfo.canFinish()); return false; - } else { - return true; } } finally { lock.unlock(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index f094039..a7d7981 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -231,6 +231,8 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { deleteDelay); queryInfoMap.remove(queryIdentifier); if (queryInfo == null) { + // One case where this happens is when a query is killed via an explicit signal, and then + // another message is received from teh AMHeartbeater. LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); return Collections.emptyList(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index cae2591..58863af 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -346,11 +346,12 @@ public SubmissionState schedule(TaskRunnerCallable task) { // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots) canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); evictedTask = waitQueue.offer(taskWrapper); + // Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable // null evicted task means offer accepted // evictedTask is not equal taskWrapper means current task is accepted and it evicted // some other task - if (evictedTask == null || evictedTask != taskWrapper) { + if (evictedTask == null || !evictedTask.equals(taskWrapper)) { knownTasks.put(taskWrapper.getRequestId(), taskWrapper); taskWrapper.setIsInWaitQueue(true); if (isDebugEnabled) { @@ -379,6 +380,18 @@ public SubmissionState schedule(TaskRunnerCallable task) { } return result; } + + // Register for notifications inside the lock. Should avoid races with unregisterForNotifications + // happens in a different Submission thread. i.e. Avoid register running for this task + // after some other submission has evicted it. + boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); + if (stateChanged) { + if (isDebugEnabled) { + LOG.debug("Finishable state of {} updated to {} during registration for state updates", + taskWrapper.getRequestId(), !canFinish); + } + finishableStateUpdated(taskWrapper, !canFinish); + } } // At this point, the task has been added into the queue. It may have caused an eviction for @@ -387,27 +400,25 @@ public SubmissionState schedule(TaskRunnerCallable task) { // This registration has to be done after knownTasks has been populated. // Register for state change notifications so that the waitQueue can be re-ordered correctly // if the fragment moves in or out of the finishable state. - boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); - if (stateChanged) { - if (isDebugEnabled) { - LOG.debug("Finishable state of {} updated to {} during registration for state updates", - taskWrapper.getRequestId(), !canFinish); - } - finishableStateUpdated(taskWrapper, !canFinish); - } if (isDebugEnabled) { LOG.debug("Wait Queue: {}", waitQueue); } + if (evictedTask != null) { - knownTasks.remove(evictedTask.getRequestId()); - evictedTask.maybeUnregisterForFinishedStateNotifications(); - evictedTask.setIsInWaitQueue(false); - evictedTask.getTaskRunnerCallable().killTask(); if (isInfoEnabled) { LOG.info("{} evicted from wait queue in favor of {} because of lower priority", evictedTask.getRequestId(), task.getRequestId()); } + try { + knownTasks.remove(evictedTask.getRequestId()); + evictedTask.maybeUnregisterForFinishedStateNotifications(); + evictedTask.setIsInWaitQueue(false); + } finally { + // This is dealing with tasks from a different submission, and cause the kill + // to go out before the previous submissions has completed. Handled in the AM + evictedTask.getTaskRunnerCallable().killTask(); + } if (metrics != null) { metrics.incrTotalEvictedFromWaitQueue(); } @@ -769,6 +780,7 @@ public boolean maybeRegisterForFinishedStateNotifications( return taskRunnerCallable.getFragmentInfo() .registerForFinishableStateUpdates(this, currentFinishableState); } else { + // State has not changed / already registered for notifications. return true; } } @@ -830,6 +842,29 @@ public void finishableStateUpdated(boolean finishableState) { taskRunnerCallable.getRequestId(), finishableState); taskExecutorService.finishableStateUpdated(this, finishableState); } + + + // TaskWrapper is used in structures, as well as for ordering using Comparators + // in the waitQueue. Avoid Object comparison. + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TaskWrapper that = (TaskWrapper) o; + + return taskRunnerCallable.getRequestId() + .equals(that.taskRunnerCallable.getRequestId()); + } + + @Override + public int hashCode() { + return taskRunnerCallable.getRequestId().hashCode(); + } } private static class ExecutorThreadFactory implements ThreadFactory { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index c692581..1de4bfe 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -752,18 +752,28 @@ public String getUniqueNodeId(TezTaskAttemptID attemptId) { public void registerTaskSubmittedToNode( TezTaskAttemptID taskAttemptID, String uniqueNodeId) { - String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId); - if (prev != null) { - LOG.warn("Replaced the unique node mapping for task from " + prev + " to " + uniqueNodeId); + synchronized (attemptToNodeMap) { + if (attemptToNodeMap.containsKey(taskAttemptID)) { + // Register only if the attempt is known. In case an unregister call + // came in before the register call. + String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId); + if (prev != null) { + LOG.warn("Replaced the unique node mapping for task from " + prev + + " to " + uniqueNodeId); + } + } } } void unregisterTaskAttempt(TezTaskAttemptID attemptId) { uniqueNodeMap.remove(attemptId); - LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId); - if (llapNodeId == null) { - // Possible since either container / task can be unregistered. - return; + LlapNodeId llapNodeId; + synchronized (attemptToNodeMap) { + llapNodeId = attemptToNodeMap.remove(attemptId); + if (llapNodeId == null) { + // Possible since either container / task can be unregistered. + return; + } } BiMap bMap = nodeMap.get(llapNodeId);