diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java index 9004d3c..081995c 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java @@ -66,13 +66,6 @@ */ public int getOutputFormatPort(); - /** - * Return the last known state (without refreshing) - * - * @return - */ - - public boolean isAlive(); /** * Config properties of the Service Instance (llap.daemon.*) diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index 1e8c895..0544038 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -16,6 +16,11 @@ import java.util.Collection; import java.util.Set; +/** + * Note: For most of the implementations, there's no guarantee that the ServiceInstance returned by + * one invocation is the same as the instance returned by another invocation. e.g. the ZK registry + * returns a new ServiceInstance object each time a getInstance call is made. + */ public interface ServiceInstanceSet { /** diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java index 79b7d51..9f2f3b4 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java @@ -31,11 +31,6 @@ public String getWorkerIdentity() { } @Override - public boolean isAlive() { - return false; - } - - @Override public String getHost() { return null; } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index bbfcbf6..10ff82e 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -184,11 +184,6 @@ public String getServicesAddress() { } @Override - public boolean isAlive() { - return true; - } - - @Override public Map getProperties() { Map properties = new HashMap<>(srv); // no worker identity diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 59f7c9e..525aadb 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -22,12 +22,9 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -60,7 +57,6 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; -import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; @@ -404,7 +400,6 @@ public void unregister() throws IOException { private class DynamicServiceInstance implements ServiceInstance { private final ServiceRecord srv; - private boolean alive = true; private final String host; private final int rpcPort; private final int mngPort; @@ -470,17 +465,6 @@ public String getServicesAddress() { } @Override - public boolean isAlive() { - return alive; - } - - public void kill() { - // May be possible to generate a notification back to the scheduler from here. - LOG.info("Killing service instance: " + this); - this.alive = false; - } - - @Override public Map getProperties() { return srv.attributes(); } @@ -494,7 +478,7 @@ public Resource getResource() { @Override public String toString() { - return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + + return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() + ", shufflePort=" + getShufflePort() + ", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]"; } @@ -509,8 +493,8 @@ public int getOutputFormatPort() { return outputFormatPort; } - // Relying on the identity hashCode and equality, since refreshing instances retains the old copy - // of an already known instance. + // TODO: This needs a hashCode/equality implementation if used as a key in various structures. + // A new ServiceInstance is created each time. } private class DynamicServiceInstanceSet implements ServiceInstanceSet { diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 288a8eb..15a81da 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -279,9 +279,7 @@ private ServiceInstance selectServiceInstance(Set serviceInstan // Get the first live service instance for (ServiceInstance serviceInstance : serviceInstances) { - if (serviceInstance.isAlive()) { - return serviceInstance; - } + return serviceInstance; } LOG.info("No live service instances were found"); diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 3f0dde5..158772b 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; @@ -70,6 +71,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Clock; @@ -82,6 +85,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; @@ -316,8 +320,8 @@ public void start() throws IOException { registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); for (ServiceInstance inst : activeInstances.getAll()) { - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, - metrics)); + addNode(new NodeInfo(inst, nodeBlacklistConf, clock, + numSchedulableTasksPerNode, metrics), inst); } } finally { writeLock.unlock(); @@ -329,22 +333,31 @@ public void start() throws IOException { @Override public void onCreate(ServiceInstance serviceInstance) { - addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock, - numSchedulableTasksPerNode, metrics)); - LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity()); + LOG.info("Added node with identity: {} as a result of registry callback", + serviceInstance.getWorkerIdentity()); + addNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock, + numSchedulableTasksPerNode, metrics), serviceInstance); } @Override public void onUpdate(ServiceInstance serviceInstance) { - instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance, - nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics)); - LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity()); + // TODO In what situations will this be invoked? + LOG.warn( + "Not expecing Updates from the registry. Received update for instance={}. Ignoring", + serviceInstance); +// Replacing NodeInfo means we end up discarding whatever state was known about that node. +// instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance, +// nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics)); +// +// +// LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity()); } @Override public void onRemove(ServiceInstance serviceInstance) { - // FIXME: disabling this for now - // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); + NodeReport nodeReport = constructNodeReport(serviceInstance, false); + getContext().nodesUpdated(Collections.singletonList(nodeReport)); + instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); if (metrics != null) { metrics.setClusterNodeCount(activeInstances.size()); @@ -447,12 +460,10 @@ public Resource getTotalResources() { try { int numInstancesFound = 0; for (ServiceInstance inst : activeInstances.getAll()) { - if (inst.isAlive()) { - Resource r = inst.getResource(); - memory += r.getMemory(); - vcores += r.getVirtualCores(); - numInstancesFound++; - } + Resource r = inst.getResource(); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + numInstancesFound++; } if (LOG.isDebugEnabled()) { LOG.debug("GetTotalResources: numInstancesFound={}, totalMem={}, totalVcores={}", @@ -475,19 +486,26 @@ public Resource getAvailableResources() { // need a state store eventually for current state & measure backoffs int memory = 0; int vcores = 0; + readLock.lock(); try { - for (Entry entry : instanceToNodeMap.entrySet()) { - if (entry.getValue().getServiceInstance().isAlive() && !entry.getValue().isDisabled()) { - Resource r = entry.getValue().getServiceInstance().getResource(); + int numInstancesFound = 0; + for (ServiceInstance inst : activeInstances.getAll()) { + NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); + if (nodeInfo != null && !nodeInfo.isDisabled()) { + Resource r = inst.getResource(); memory += r.getMemory(); vcores += r.getVirtualCores(); + numInstancesFound++; } } + if (LOG.isDebugEnabled()) { + LOG.debug("GetAvailableResources: numInstancesFound={}, totalMem={}, totalVcores={}", + numInstancesFound, memory, vcores); + } } finally { readLock.unlock(); } - return Resource.newInstance(memory, vcores); } @@ -495,13 +513,7 @@ public Resource getAvailableResources() { public int getClusterNodeCount() { readLock.lock(); try { - int n = 0; - for (ServiceInstance inst : activeInstances.getAll()) { - if (inst.isAlive()) { - n++; - } - } - return n; + return activeInstances.getAll().size(); } finally { readLock.unlock(); } @@ -516,17 +528,22 @@ public void dagComplete() { metrics.incrCompletedDagCount(); } dagStats = new StatsPerDag(); + // TODO Cleanup pending tasks etc, so that the next dag is not affected. } @Override public void blacklistNode(NodeId nodeId) { LOG.info("BlacklistNode not supported"); + // TODO Disable blacklisting in Tez when using LLAP, until this is properly supported. + // Blacklisting can cause containers to move to a terminating state, which can cause attempt to be marked as failed. + // This becomes problematic when we set #allowedFailures to 0 // TODO HIVE-13484 What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted. } @Override public void unblacklistNode(NodeId nodeId) { LOG.info("unBlacklistNode not supported"); + // TODO: See comments under blacklistNode. } @Override @@ -598,26 +615,20 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd } return false; } - ServiceInstance assignedInstance = taskInfo.assignedInstance; - assert assignedInstance != null; - - NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance.getWorkerIdentity()); + NodeInfo nodeInfo = taskInfo.assignedNode; assert nodeInfo != null; - LOG.info("Processing de-allocate request for task={}, state={}, endReason={}", taskInfo.task, taskInfo.getState(), endReason); // Re-enable the node if preempted if (taskInfo.getState() == TaskInfo.State.PREEMPTED) { LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); - unregisterPendingPreemption(taskInfo.assignedInstance.getHost()); + unregisterPendingPreemption(taskInfo.assignedNode.getHost()); nodeInfo.registerUnsuccessfulTaskEnd(true); if (nodeInfo.isDisabled()) { - // Re-enable the node. If a task succeeded, a slot may have become available. - // Also reset commFailures since a task was able to communicate back and indicate success. - nodeInfo.enableNode(); - // Re-insert into the queue to force the poll thread to remove the element. - reinsertNodeInfo(nodeInfo); + // Re-enable the node, if a task completed due to preemption. Capacity has become available, + // and we may have been able to communicate with the node. + queueNodeForReEnablement(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -631,9 +642,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd if (nodeInfo.isDisabled()) { // Re-enable the node. If a task succeeded, a slot may have become available. // Also reset commFailures since a task was able to communicate back and indicate success. - nodeInfo.enableNode(); - // Re-insert into the queue to force the poll thread to remove the element. - reinsertNodeInfo(nodeInfo); + queueNodeForReEnablement(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -644,21 +653,21 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd .of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) .contains(endReason)) { if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { - dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); + dagStats.registerCommFailure(taskInfo.assignedNode.getHost()); } else if (endReason == TaskAttemptEndReason.EXECUTOR_BUSY) { - dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + dagStats.registerTaskRejected(taskInfo.assignedNode.getHost()); } } if (endReason != null && endReason == TaskAttemptEndReason.NODE_FAILED) { LOG.info( - "Task {} ended on {} nodeInfo.toString() with a NODE_FAILED message." + - " An message should come in from the registry to disable this node unless" + + "Task {} ended on {} with a NODE_FAILED message." + + " A message should come in from the registry to disable this node unless" + " this was a temporary communication failure", - task, assignedInstance); + task, nodeInfo.toShortString()); } boolean commFailure = endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; - disableInstance(assignedInstance, commFailure); + disableNode(nodeInfo, commFailure); } } } finally { @@ -668,15 +677,6 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd return true; } - private void reinsertNodeInfo(final NodeInfo nodeInfo) { - if ( disabledNodesQueue.remove(nodeInfo)) { - disabledNodesQueue.add(nodeInfo); - } - if (metrics != null) { - metrics.setDisabledNodeCount(disabledNodesQueue.size()); - } - } - @Override public Object deallocateContainer(ContainerId containerId) { LOG.debug("Ignoring deallocateContainer for containerId: " + containerId); @@ -730,11 +730,11 @@ private SelectHostResult selectHost(TaskInfo request) { if (nodeInfo.canAcceptTask()) { // Successfully scheduled. LOG.info( - "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host + + "Assigning " + nodeInfo.toShortString() + " when looking for " + host + ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) + (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length : "")); - return new SelectHostResult(inst, nodeInfo); + return new SelectHostResult(nodeInfo); } else { // The node cannot accept a task at the moment. if (shouldDelayForLocality) { @@ -755,8 +755,8 @@ private SelectHostResult selectHost(TaskInfo request) { } } else { LOG.warn( - "Null NodeInfo when attempting to get host with worker identity {}, and host {}", - inst.getWorkerIdentity(), host); + "Null NodeInfo when attempting to get host with worker {}, and host {}", + inst, host); // Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay, // else ends up allocating to a random host immediately. } @@ -798,56 +798,46 @@ private SelectHostResult selectHost(TaskInfo request) { return SELECT_HOST_RESULT_DELAYED_RESOURCES; } NodeInfo randomNode = all.get(random.nextInt(all.size())); - LOG.info("Assigning " + nodeToString(randomNode.getServiceInstance(), randomNode) + LOG.info("Assigning " + randomNode.toShortString() + " when looking for any host, from #hosts=" + all.size() + ", requestedHosts=" + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : Arrays.toString(requestedHosts))); - return new SelectHostResult(randomNode.getServiceInstance(), randomNode); + return new SelectHostResult(randomNode); } finally { readLock.unlock(); } } - private void scanForNodeChanges() { - /* check again whether nodes are disabled or just missing */ - writeLock.lock(); - try { - for (ServiceInstance inst : activeInstances.getAll()) { - if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) { - /* that's a good node, not added to the allocations yet */ - LOG.info("Found a new node: " + inst + "."); - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, - metrics)); - } - } - } finally { - writeLock.unlock(); - } - } - - private void addNode(ServiceInstance inst, NodeInfo node) { + private void addNode(NodeInfo node, ServiceInstance serviceInstance) { // we have just added a new node. Signal timeout monitor to reset timer if (activeInstances.size() == 1) { LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer."); stopTimeoutMonitor(); } - instanceToNodeMap.put(inst.getWorkerIdentity(), node); + + NodeReport nodeReport = constructNodeReport(serviceInstance, true); + getContext().nodesUpdated(Collections.singletonList(nodeReport)); + + instanceToNodeMap.put(node.getNodeIdentity(), node); if (metrics != null) { metrics.setClusterNodeCount(activeInstances.size()); } // Trigger scheduling since a new node became available. + LOG.info("Adding new node: {}", node); trySchedulingPendingTasks(); } private void reenableDisabledNode(NodeInfo nodeInfo) { writeLock.lock(); try { - LOG.info("Attempting to re-enable node: " + nodeInfo.getServiceInstance()); - if (nodeInfo.getServiceInstance().isAlive()) { + LOG.info("Attempting to re-enable node: " + nodeInfo.toShortString()); + if (activeInstances.getInstance(nodeInfo.getNodeIdentity()) != null) { nodeInfo.enableNode(); } else { if (LOG.isInfoEnabled()) { - LOG.info("Removing dead node " + nodeInfo); + LOG.info( + "Not re-enabling node: {}, since it is not present in the RegistryActiveNodeList", + nodeInfo.toShortString()); } } } finally { @@ -855,13 +845,32 @@ private void reenableDisabledNode(NodeInfo nodeInfo) { } } - private void disableInstance(ServiceInstance instance, boolean isCommFailure) { + /** + * Updates relevant structures on the node, and fixes the position in the disabledNodeQueue + * to facilitate the actual re-enablement of the node. + * @param nodeInfo the node to be re-enabled + */ + private void queueNodeForReEnablement(final NodeInfo nodeInfo) { + nodeInfo.enableNode(); + if ( disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } + if (metrics != null) { + metrics.setDisabledNodeCount(disabledNodesQueue.size()); + } + } + + private void disableNode(NodeInfo nodeInfo, boolean isCommFailure) { writeLock.lock(); try { - NodeInfo nodeInfo = instanceToNodeMap.get(instance.getWorkerIdentity()); if (nodeInfo == null || nodeInfo.isDisabled()) { if (LOG.isDebugEnabled()) { - LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); + if (nodeInfo != null) { + LOG.debug("Node: " + nodeInfo.toShortString() + + " already disabled, or invalid. Not doing anything."); + } else { + LOG.debug("Ignoring disableNode invocation for null NodeInfo"); + } } } else { nodeInfo.disableNode(isCommFailure); @@ -879,6 +888,16 @@ private void disableInstance(ServiceInstance instance, boolean isCommFailure) { } } + private static NodeReport constructNodeReport(ServiceInstance serviceInstance, + boolean healthy) { + NodeReport nodeReport = NodeReport.newInstance(NodeId + .newInstance(serviceInstance.getHost(), serviceInstance.getRpcPort()), + healthy ? NodeState.RUNNING : NodeState.LOST, + serviceInstance.getServicesAddress(), null, null, + null, 0, "", 0l); + return nodeReport; + } + private void addPendingTask(TaskInfo taskInfo) { writeLock.lock(); try { @@ -1124,21 +1143,21 @@ private String constructPendingTaskCountsLogMessage() { private ScheduleResult scheduleTask(TaskInfo taskInfo) { SelectHostResult selectHostResult = selectHost(taskInfo); if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) { - NodeServiceInstancePair nsPair = selectHostResult.nodeServiceInstancePair; + NodeInfo nodeInfo = selectHostResult.nodeInfo; Container container = containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, - nsPair.getServiceInstance().getHost(), - nsPair.getServiceInstance().getRpcPort(), - nsPair.getServiceInstance().getServicesAddress()); + nodeInfo.getHost(), + nodeInfo.getRpcPort(), + nodeInfo.getServiceAddress()); writeLock.lock(); // While updating local structures try { - LOG.info("Assigned task {} to container {} on node={}", taskInfo, container.getId(), - nsPair.getServiceInstance()); + LOG.info("Assigned task={} on node={}, to container={} on node={}", + taskInfo, nodeInfo.toShortString(), container.getId()); dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, - nsPair.getServiceInstance().getHost()); - taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId(), clock.getTime()); + nodeInfo.getHost()); + taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime()); registerRunningTask(taskInfo); - nsPair.getNodeInfo().registerTaskScheduled(); + nodeInfo.registerTaskScheduled(); } finally { writeLock.unlock(); } @@ -1169,7 +1188,7 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten Iterator taskInfoIterator = entryAtPriority.getValue().iterator(); while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { TaskInfo taskInfo = taskInfoIterator.next(); - if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedInstance.getHost())) { + if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedNode.getHost())) { // Candidate for preemption. preemptedCount++; LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo, @@ -1178,9 +1197,9 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten if (preemptedTaskList == null) { preemptedTaskList = new LinkedList<>(); } - dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); + dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost()); preemptedTaskList.add(taskInfo); - registerPendingPreemption(taskInfo.assignedInstance.getHost()); + registerPendingPreemption(taskInfo.assignedNode.getHost()); // Remove from the runningTaskList taskInfoIterator.remove(); } @@ -1256,14 +1275,6 @@ private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) { } } - private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) { - return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" + - serviceInstance.getWorkerIdentity() + ", webAddress=" + - serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks; - } - - - // ------ Inner classes defined after this point ------ @VisibleForTesting @@ -1322,37 +1333,19 @@ DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() { private class NodeEnablerCallable implements Callable { private final AtomicBoolean isShutdown = new AtomicBoolean(false); - private static final long REFRESH_INTERVAL = 10000l; - long nextPollInterval = REFRESH_INTERVAL; - long lastRefreshTime; + private static final long POLL_TIMEOUT = 10000L; @Override public Void call() { - lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime(); while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { - while (true) { - NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS); - if (nodeInfo != null) { - long currentTime = LlapTaskSchedulerService.this.clock.getTime(); - // A node became available. Enable the node and try scheduling. - reenableDisabledNode(nodeInfo); - trySchedulingPendingTasks(); - - nextPollInterval -= (currentTime - lastRefreshTime); - } - - if (nextPollInterval < 0 || nodeInfo == null) { - // timeout expired. Reset the poll interval and refresh nodes. - nextPollInterval = REFRESH_INTERVAL; - lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime(); - // TODO Get rid of this polling once we have notificaitons from the registry sub-system - if (LOG.isDebugEnabled()) { - LOG.debug("Refreshing instances based on poll interval"); - } - scanForNodeChanges(); - } + NodeInfo nodeInfo = + disabledNodesQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS); + if (nodeInfo != null) { + // A node became available. Enable the node and try scheduling. + reenableDisabledNode(nodeInfo); + trySchedulingPendingTasks(); } } catch (InterruptedException e) { if (isShutdown.get()) { @@ -1393,8 +1386,9 @@ public void run() { getContext().reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "No LLAP Daemons are running", getContext().getCurrentDagInfo()); } catch (Exception e) { + DagInfo currentDagInfo = getContext().getCurrentDagInfo(); LOG.error("Exception when reporting SERVICE_UNAVAILABLE error for dag: {}", - getContext().getCurrentDagInfo().getName(), e); + currentDagInfo == null ? "" : currentDagInfo.getName(), e); } } } @@ -1505,8 +1499,20 @@ public void shutdown() { } - ServiceInstance getServiceInstance() { - return serviceInstance; + String getNodeIdentity() { + return serviceInstance.getWorkerIdentity(); + } + + String getHost() { + return serviceInstance.getHost(); + } + + int getRpcPort() { + return serviceInstance.getRpcPort(); + } + + String getServiceAddress() { + return serviceInstance.getServicesAddress(); } void enableNode() { @@ -1535,7 +1541,8 @@ void disableNode(boolean commFailure) { delayTime = blacklistConf.maxDelay; } if (LOG.isInfoEnabled()) { - LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", serviceInstance, + LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", + serviceInstance, delayTime, commFailure); } expireTimeMillis = currentTime + delayTime; @@ -1577,7 +1584,7 @@ void registerUnsuccessfulTaskEnd(boolean wasPreempted) { /** * @return the time at which this node will be re-enabled */ - public long getEnableTime() { + long getEnableTime() { return expireTimeMillis; } @@ -1585,22 +1592,21 @@ public boolean isDisabled() { return disabled; } - public boolean hadCommFailure() { + boolean hadCommFailure() { return hadCommFailure; } /* Returning true does not guarantee that the task will run, considering other queries may be running in the system. Also depends upon the capacity usage configuration */ - public boolean canAcceptTask() { - boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive() + boolean canAcceptTask() { + boolean result = !hadCommFailure && !disabled &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); if (LOG.isInfoEnabled()) { LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " + serviceInstance.getWorkerIdentity() + "]: " + - "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", - result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, - serviceInstance.isAlive()); + "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}", + result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled); } return result; } @@ -1634,6 +1640,13 @@ public String toString() { + ", commFailures=" + hadCommFailure +'}'; } + + private String toShortString() { + return "{" + serviceInstance.getHost() + ":" + + serviceInstance.getRpcPort() + ", id=" + getNodeIdentity() + + ", stc=" + numSchedulableTasks + "}"; + } + } @VisibleForTesting @@ -1699,6 +1712,7 @@ void registerTaskAllocated(String[] requestedHosts, String[] requestedRacks, _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost); } + // TODO Track stats of rejections etc per host void registerTaskPreempted(String host) { numPreemptedTasks++; } @@ -1753,7 +1767,7 @@ private void _registerAllocationInHostMap(String host, Map