diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 2fe1017..3d59702 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -217,7 +217,7 @@ public Boolean call() throws Exception { if (response.shouldDie) { // AM sent a shouldDie=true - LOG.info("Asked to die via task heartbeat"); + LOG.info("Asked to die via task heartbeat: {}", task.getTaskAttemptID()); return false; } else { if (response.numEvents < maxEventsToGet) { @@ -297,7 +297,7 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t } if (response.shouldDie()) { - LOG.info("Received should die response from AM"); + LOG.info("Received should die response from AM: {}", task.getTaskAttemptID()); askedToDie.set(true); return new ResponseWrapper(true, 1); } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index cfcf0f0..bbf275e 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -182,6 +182,8 @@ public int compare(Priority o1, Priority o2) { private final SchedulerTimeoutMonitor timeoutMonitor; private ScheduledFuture timeoutFuture; + private final AtomicInteger assignedTaskCounter = new AtomicInteger(0); + private final LlapRegistryService registry = new LlapRegistryService(false); private volatile ListenableFuture nodeEnablerFuture; @@ -362,9 +364,11 @@ public void onUpdate(ServiceInstance serviceInstance) { @Override public void onRemove(ServiceInstance serviceInstance) { NodeReport nodeReport = constructNodeReport(serviceInstance, false); + LOG.info("Sending out nodeReport for onRemove: {}", nodeReport); getContext().nodesUpdated(Collections.singletonList(nodeReport)); instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); - LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + LOG.info("Removed node with identity: {} due to RegistryNotification. currentActiveInstances={}", + serviceInstance.getWorkerIdentity(), activeInstances.size()); if (metrics != null) { metrics.setClusterNodeCount(activeInstances.size()); } @@ -650,6 +654,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd NodeInfo nodeInfo = taskInfo.assignedNode; assert nodeInfo != null; + // endReason shows up as OTHER for CONTAINER_TIME_OUT LOG.info("Processing de-allocate request for task={}, state={}, endReason={}", taskInfo.task, taskInfo.getState(), endReason); // Re-enable the node if preempted @@ -680,6 +685,8 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd } else { // Task Failed nodeInfo.registerUnsuccessfulTaskEnd(false); + // TODO Include EXTERNAL_PREEMPTION in this list? + // TODO Differentiate between EXTERNAL_PREEMPTION_WAITQUEU vs EXTERNAL_PREEMPTION_FINISHABLE? if (endReason != null && EnumSet .of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) .contains(endReason)) { @@ -762,12 +769,11 @@ private SelectHostResult selectHost(TaskInfo request) { if (nodeInfo != null) { if (nodeInfo.canAcceptTask()) { // Successfully scheduled. - LOG.info( - "Assigning {} when looking for {}." - + " local=true FirstRequestedHost={}, #prefLocations={}", nodeInfo - .toShortString(), host, (prefHostCount == 0) + - (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length : - "")); + LOG.info("Assigning {} when looking for {}." + + " local=true FirstRequestedHost={}, #prefLocations={}", + nodeInfo.toShortString(), host, + (prefHostCount == 0), + requestedHosts.length); return new SelectHostResult(nodeInfo); } else { // The node cannot accept a task at the moment. @@ -918,12 +924,19 @@ private void addNode(NodeInfo node, ServiceInstance serviceInstance) { NodeReport nodeReport = constructNodeReport(serviceInstance, true); getContext().nodesUpdated(Collections.singletonList(nodeReport)); + // When the same node goes away and comes back... the old entry will be lost - which means + // we don't know how many fragments we have actually scheduled on this node. + + // Replacing it is the right thing to do though, since we expect the AM to kill all the fragments running on the node, via timeouts. + // De-allocate messages coming in from the old node are sent to the NodeInfo instance for the old node. + instanceToNodeMap.put(node.getNodeIdentity(), node); if (metrics != null) { metrics.setClusterNodeCount(activeInstances.size()); } // Trigger scheduling since a new node became available. - LOG.info("Adding new node: {}", node); + LOG.info("Adding new node: {}. TotalNodeCount={}. activeInstances.size={}", + node, instanceToNodeMap.size(), activeInstances.size()); trySchedulingPendingTasks(); } @@ -933,6 +946,9 @@ private void reenableDisabledNode(NodeInfo nodeInfo) { LOG.info("Attempting to re-enable node: " + nodeInfo.toShortString()); if (activeInstances.getInstance(nodeInfo.getNodeIdentity()) != null) { nodeInfo.enableNode(); + if (metrics != null) { + metrics.setDisabledNodeCount(disabledNodesQueue.size()); + } } else { if (LOG.isInfoEnabled()) { LOG.info( @@ -951,13 +967,11 @@ private void reenableDisabledNode(NodeInfo nodeInfo) { * @param nodeInfo the node to be re-enabled */ private void queueNodeForReEnablement(final NodeInfo nodeInfo) { - nodeInfo.enableNode(); if ( disabledNodesQueue.remove(nodeInfo)) { + LOG.info("Queueing node for re-enablement: {}", nodeInfo.toShortString()); + nodeInfo.resetExpireInformation(); disabledNodesQueue.add(nodeInfo); } - if (metrics != null) { - metrics.setDisabledNodeCount(disabledNodesQueue.size()); - } } private void disableNode(NodeInfo nodeInfo, boolean isCommFailure) { @@ -1163,9 +1177,9 @@ protected void schedulePendingTasks() { // Preempt on specific host boolean shouldPreempt = true; for (String host : potentialHosts) { - // Preempt only if there are not pending preemptions on the same host + // Preempt only if there are no pending preemptions on the same host // When the premption registers, the request at the highest priority will be given the slot, - // even if the initial request was for some other task. + // even if the initial preemption was caused by some other task. // TODO Maybe register which task the preemption was for, to avoid a bad non-local allocation. MutableInt pendingHostPreemptions = pendingPreemptionsPerHost.get(host); if (pendingHostPreemptions != null && pendingHostPreemptions.intValue() > 0) { @@ -1178,7 +1192,7 @@ protected void schedulePendingTasks() { } if (shouldPreempt) { if (LOG.isDebugEnabled()) { - LOG.debug("Preempting for {} on potential hosts={}. TotalPendingPreemptions={}", + LOG.debug("Attempting to preempt for {} on potential hosts={}. TotalPendingPreemptions={}", taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get()); } preemptTasks(entry.getKey().getPriority(), 1, potentialHosts); @@ -1194,7 +1208,11 @@ protected void schedulePendingTasks() { LOG.debug("Attempting to preempt on any host for task={}, pendingPreemptions={}", taskInfo.task, pendingPreemptions.get()); if (pendingPreemptions.get() == 0) { - LOG.info("Preempting for task={} on any available host", taskInfo.task); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Attempting to preempt for task={}, priority={} on any available host", + taskInfo.task, taskInfo.priority); + } preemptTasks(entry.getKey().getPriority(), 1, null); } else { if (LOG.isDebugEnabled()) { @@ -1263,7 +1281,10 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource) { nodeInfo.getServiceAddress()); writeLock.lock(); // While updating local structures try { - LOG.info("Assigned task={} on node={}, to container={}", + // The canAccept part of this log message does not account for this allocation. + assignedTaskCounter.incrementAndGet(); + LOG.info("Assigned #{}, task={} on node={}, to container={}", + assignedTaskCounter.get(), taskInfo, nodeInfo.toShortString(), container.getId()); dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nodeInfo.getHost()); @@ -1321,7 +1342,8 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten } } else { // No tasks qualify as preemptable - LOG.debug("No tasks qualify as killable to schedule tasks at priority {}", forPriority); + LOG.debug("No tasks qualify as killable to schedule tasks at priority {}. Current priority={}", + forPriority, entryAtPriority.getKey()); break; } } @@ -1572,6 +1594,8 @@ public void shutdown() { private final LlapTaskSchedulerMetrics metrics; private final Resource resourcePerExecutor; + private final String shortStringBase; + /** * Create a NodeInfo bound to a service instance * @param serviceInstance the associated serviceInstance @@ -1612,6 +1636,7 @@ public void shutdown() { if (metrics != null) { metrics.incrSchedulableTasksCount(numSchedulableTasks); } + shortStringBase = setupShortStringBase(); } @@ -1635,12 +1660,16 @@ public Resource getResourcePerExecutor() { return resourcePerExecutor; } - void enableNode() { + void resetExpireInformation() { expireTimeMillis = -1; - disabled = false; hadCommFailure = false; } + void enableNode() { + resetExpireInformation(); + disabled = false; + } + void disableNode(boolean commFailure) { long duration = blacklistConf.minDelay; long currentTime = clock.getTime(); @@ -1662,7 +1691,7 @@ void disableNode(boolean commFailure) { } if (LOG.isInfoEnabled()) { LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", - serviceInstance, + toShortString(), delayTime, commFailure); } expireTimeMillis = currentTime + delayTime; @@ -1716,13 +1745,17 @@ boolean hadCommFailure() { return hadCommFailure; } + boolean _canAccepInternal() { + return !hadCommFailure && !disabled + &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); + } + int canAcceptCounter = 0; /* Returning true does not guarantee that the task will run, considering other queries may be running in the system. Also depends upon the capacity usage configuration */ boolean canAcceptTask() { - boolean result = !hadCommFailure && !disabled - &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); + boolean result = _canAccepInternal(); if (LOG.isTraceEnabled()) { LOG.trace(constructCanAcceptLogResult(result)); } @@ -1763,6 +1796,10 @@ public int compareTo(Delayed o) { } } + private String setupShortStringBase() { + return "{" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", id=" + getNodeIdentity(); + } + @Override public String toString() { return "NodeInfo{" + "instance=" + serviceInstance @@ -1777,11 +1814,17 @@ public String toString() { } private String toShortString() { - return "{" + serviceInstance.getHost() + ":" + - serviceInstance.getRpcPort() + ", id=" + getNodeIdentity() + - ", stc=" + numSchedulableTasks + "}"; + StringBuilder sb = new StringBuilder(); + sb.append(", canAcceptTask=").append(_canAccepInternal()); + sb.append(", st=").append(numScheduledTasks); + sb.append(", ac=").append((numSchedulableTasks - numScheduledTasks)); + sb.append(", commF=").append(hadCommFailure); + sb.append(", disabled=").append(disabled); + sb.append("}"); + return shortStringBase + sb.toString(); } + } @VisibleForTesting