diff --git llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index 8c5c3e4..f03c807 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -29,6 +29,8 @@ public LlapConfiguration() { } + public static final String LLAP_PREFIX = "llap."; + public static final String LLAP_DAEMON_PREFIX = "llap.daemon."; private static final String LLAP_DAEMON_SITE = "llap-daemon-site.xml"; @@ -74,12 +76,38 @@ public LlapConfiguration() { public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 5; /** - * Time after which a previously disabled node will be re-enabled for scheduling. This may be - * modified by an exponential back-off if failures persist + * Minimum time after which a previously disabled node will be re-enabled for scheduling. This may + * be modified by an exponential back-off if failures persist + */ + public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS = + LLAP_PREFIX + "task.scheduler.node.re-enable.min.timeout.ms"; + public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT = 200l; + + /** + * Maximum time after which a previously disabled node will be re-enabled for scheduling. This may + * be modified by an exponential back-off if failures persist + */ + public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS = + LLAP_PREFIX + "task.scheduler.node.re-enable.max.timeout.ms"; + public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT = 10000l; + + /** + * Backoff factor on successive blacklists of a node. Blacklists timeouts start at the min timeout + * and go up to the max timeout based on this backoff factor + */ + public static final String LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR = + LLAP_PREFIX + "task.scheduler.node.disable.backoff.factor"; + public static final float LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT = 1.5f; + + /** + * The number of tasks the AM TaskScheduler will try allocating per node. + * 0 indicates that this should be picked up from the Registry. + * -1 indicates unlimited capacity + * >0 indicates a specific bound */ - public static final String LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS = - LLAP_DAEMON_PREFIX + "task.scheduler.node.re-enable.timeout.ms"; - public static final long LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT = 2000l; + public static final String LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE = + LLAP_PREFIX + "task.scheduler.num.schedulable.tasks.per.node"; + public static final int LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT = 0; public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE = LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size"; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java index c600e74..cdc3930 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java @@ -55,7 +55,8 @@ public LlapFixedRegistryImpl(String hosts, Configuration conf) { for (Map.Entry kv : conf) { if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) - || kv.getKey().startsWith("hive.llap.")) { + || kv.getKey().startsWith("hive.llap.") + || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) { // TODO: read this somewhere useful, like the task scheduler srv.put(kv.getKey(), kv.getValue()); } @@ -152,6 +153,14 @@ public Resource getResource() { return Resource.newInstance(memory, vcores); } + @Override + public String toString() { + return "FixedServiceInstance{" + + "host=" + host + + ", memory=" + memory + + ", vcores=" + vcores + + '}'; + } } private final class FixedServiceInstanceSet implements ServiceInstanceSet { diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 39ad552..3a827c3 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -39,6 +39,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -68,8 +71,6 @@ private static final Log LOG = LogFactory.getLog(LlapTaskSchedulerService.class); - private static final float BACKOFF_FACTOR = 1.2f; - private final ExecutorService appCallbackExecutor; private final TaskSchedulerAppCallback appClientDelegate; @@ -101,36 +102,39 @@ public int compare(Priority o1, Priority o2) { private final ContainerFactory containerFactory; private final Random random = new Random(); private final Clock clock; - private final ListeningExecutorService executor; + + private final ListeningExecutorService nodeEnabledExecutor; + private final NodeEnablerCallable nodeEnablerCallable = + new NodeEnablerCallable(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); - // TODO Track resources used by this application on specific hosts, and make scheduling decisions - // accordingly. - // Ideally implement in a way where updates from ZK, if they do come, can just be plugged in. - // A heap based on available capacity - which is updated each time stats are updated, - // or anytime assignment numbers are changed. Especially for random allocations (no host request). - // For non-random allocations - Walk through all pending tasks to get local assignments, then - // start assigning them to non local hosts. - // Also setup a max over-subscribe limit as part of this. + private final Lock scheduleLock = new ReentrantLock(); + private final Condition scheduleCondition = scheduleLock.newCondition(); + private final ListeningExecutorService schedulerExecutor; + private final SchedulerCallable schedulerCallable = new SchedulerCallable(); private final AtomicBoolean isStopped = new AtomicBoolean(false); - private final long nodeReEnableTimeout; + private final NodeBlacklistConf nodeBlacklistConf; // Per daemon private final int memoryPerInstance; private final int coresPerInstance; private final int executorsPerInstance; + private final int numSchedulableTasksPerNode; + // Per Executor Thread private final Resource resourcePerExecutor; private final LlapRegistryService registry = new LlapRegistryService(); - private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable = - new PendingTaskSchedulerCallable(); - private ListenableFuture pendingTaskSchedulerFuture; + + + private volatile ListenableFuture nodeEnablerFuture; + private volatile ListenableFuture schedulerFuture; @VisibleForTesting private final AtomicInteger dagCounter = new AtomicInteger(1); @@ -158,9 +162,17 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a this.executorsPerInstance = conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); - this.nodeReEnableTimeout = - conf.getLong(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS, - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT); + this.nodeBlacklistConf = new NodeBlacklistConf( + conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS, + LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT), + conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS, + LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT), + conf.getFloat(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR, + LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT)); + + this.numSchedulableTasksPerNode = conf.getInt( + LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE, + LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT); int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); @@ -171,16 +183,19 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + " must be defined"); - ExecutorService executorService = + ExecutorService executorServiceRaw = Executors.newFixedThreadPool(1, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); - executor = MoreExecutors.listeningDecorator(executorService); + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build()); + nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw); + + ExecutorService schedulerExecutorServiceRaw = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); + schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw); LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor - + ", nodeReEnableTimeout=" + nodeReEnableTimeout + ", nodeReEnableBackOffFactor=" - + BACKOFF_FACTOR); + + ", nodeBlacklistConf=" + nodeBlacklistConf); } @Override @@ -192,11 +207,12 @@ public void serviceInit(Configuration conf) { public void serviceStart() throws IOException { writeLock.lock(); try { - pendingTaskSchedulerFuture = executor.submit(pendingTaskSchedulerCallable); + nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); + schedulerFuture = schedulerExecutor.submit(schedulerCallable); registry.start(); activeInstances = registry.getInstances(); for (ServiceInstance inst : activeInstances.getAll().values()) { - addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); } } finally { writeLock.unlock(); @@ -208,11 +224,18 @@ public void serviceStop() { writeLock.lock(); try { if (!this.isStopped.getAndSet(true)) { - pendingTaskSchedulerCallable.shutdown(); - if (pendingTaskSchedulerFuture != null) { - pendingTaskSchedulerFuture.cancel(true); + nodeEnablerCallable.shutdown(); + if (nodeEnablerFuture != null) { + nodeEnablerFuture.cancel(true); } - executor.shutdownNow(); + nodeEnabledExecutor.shutdownNow(); + + schedulerCallable.shutdown(); + if (schedulerFuture != null) { + schedulerFuture.cancel(true); + } + schedulerExecutor.shutdownNow(); + if (registry != null) { registry.stop(); } @@ -232,7 +255,7 @@ public Resource getTotalResources() { for (ServiceInstance inst : activeInstances.getAll().values()) { if (inst.isAlive()) { Resource r = inst.getResource(); - LOG.info("Found instance " + inst + " with " + r); + LOG.info("Found instance " + inst); memory += r.getMemory(); vcores += r.getVirtualCores(); } else { @@ -317,10 +340,8 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin } finally { writeLock.unlock(); } - boolean scheduled = scheduleTask(taskInfo); - if (!scheduled) { - addPendingTask(taskInfo); - } + addPendingTask(taskInfo); + trySchedulingPendingTasks(); } @Override @@ -336,10 +357,8 @@ public void allocateTask(Object task, Resource capability, ContainerId container } finally { writeLock.unlock(); } - boolean scheduled = scheduleTask(taskInfo); - if (!scheduled) { - addPendingTask(taskInfo); - } + addPendingTask(taskInfo); + trySchedulingPendingTasks(); } // This may be invoked before a container is ever assigned to a task. allocateTask... app decides @@ -373,25 +392,39 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd ServiceInstance assignedInstance = taskInfo.assignedInstance; assert assignedInstance != null; + NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); + assert nodeInfo != null; if (taskSucceeded) { // The node may have been blacklisted at this point - which means it may not be in the // activeNodeList. - NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); - assert nodeInfo != null; + nodeInfo.registerTaskSuccess(); - // TODO Consider un-blacklisting the node since at least 1 slot should have become available - // on the node. - } else if (!taskSucceeded - && endReason != null - && EnumSet - .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) - .contains(endReason)) { - if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { - dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); - } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { - dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + + if (nodeInfo.isDisabled()) { + // Re-enable the node. If a task succeeded, a slot may have become available. + // Also reset commFailures since a task was able to communicate back and indicate success. + nodeInfo.enableNode(); + // Re-insert into the queue to force the poll thread to remove the element. + if ( disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } } - disableInstance(assignedInstance, endReason == TaskAttemptEndReason.SERVICE_BUSY); + // In case of success, trigger a scheduling run for pending tasks. + trySchedulingPendingTasks(); + + } else if (!taskSucceeded) { + nodeInfo.registerUnsuccessfulTaskEnd(); + if (endReason != null && EnumSet + .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) + .contains(endReason)) { + if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { + dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); + } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { + dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); + } + } + boolean commFailure = endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; + disableInstance(assignedInstance, commFailure); } } finally { writeLock.unlock(); @@ -433,7 +466,7 @@ TaskSchedulerAppCallback createAppCallbackDelegate(TaskSchedulerAppCallback real * @param request the list of preferred hosts. null implies any host * @return */ - private ServiceInstance selectHost(TaskInfo request) { + private NodeServiceInstancePair selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { @@ -459,91 +492,81 @@ private ServiceInstance selectHost(TaskInfo request) { if (!instances.isEmpty()) { for (ServiceInstance inst : instances) { NodeInfo nodeInfo = instanceToNodeMap.get(inst); - if (inst.isAlive() && nodeInfo != null && !nodeInfo.isDisabled()) { - // TODO Change this to work off of what we think is remaining capacity for an - // instance - LOG.info( - "Assigning " + inst + " when looking for " + host + ". FirstRequestedHost=" + - (prefHostCount == 0)); - return inst; + if (nodeInfo != null && nodeInfo.canAcceptTask()) { + LOG.info("Assigning " + inst + " when looking for " + host + "." + + " FirstRequestedHost=" + (prefHostCount == 0)); + return new NodeServiceInstancePair(inst, nodeInfo); } } } } } /* fall through - miss in locality (random scheduling) */ - Entry [] all = instanceToNodeMap.entrySet().toArray(new Entry[instanceToNodeMap.size()]); + Entry[] all = + instanceToNodeMap.entrySet().toArray(new Entry[instanceToNodeMap.size()]); // Check again if (all.length > 0) { int n = random.nextInt(all.length); // start at random offset and iterate whole list for (int i = 0; i < all.length; i++) { Entry inst = all[(i + n) % all.length]; - if (inst.getKey().isAlive() && !inst.getValue().isDisabled()) { + if (inst.getValue().canAcceptTask()) { LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length); - return inst.getKey(); + return new NodeServiceInstancePair(inst.getKey(), inst.getValue()); } } } + return null; } finally { readLock.unlock(); } + } - // TODO Ideally, each refresh operation should addNodes if they don't already exist. - // Even better would be to get notifications from the service impl when a node gets added or removed. - // Instead of having to walk through the entire list. The computation of a node getting added or - // removed already exists in the DynamicRegistry implementation. - - - // This will only happen if no allocations are possible, which means all other nodes have - // been blacklisted. - // TODO Look for new nodes more often. See comment above. + // TODO Each refresh operation should addNodes if they don't already exist. + // Even better would be to get notifications from the service impl when a node gets added or removed. + // Instead of having to walk through the entire list. The computation of a node getting added or + // removed already exists in the DynamicRegistry implementation. + private void refreshInstances() { + try { + activeInstances.refresh(); // handles its own sync + } catch (IOException ioe) { + LOG.warn("Could not refresh list of active instances", ioe); + } + } + private void scanForNodeChanges() { /* check again whether nodes are disabled or just missing */ writeLock.lock(); try { for (ServiceInstance inst : activeInstances.getAll().values()) { if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) { /* that's a good node, not added to the allocations yet */ - LOG.info("Found a new node: " + inst + ". Adding to node list and disabling to trigger scheduling"); - addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock)); - // mark it as disabled to let the pending tasks go there - // TODO If disabling the instance, have it wake up immediately instead of waiting. - // Ideally get rid of this requirement, by having all tasks allocated via a queue. - disableInstance(inst, true); + LOG.info("Found a new node: " + inst + "."); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); } } - /* do not allocate nodes from this process, as then the pending tasks will get starved */ } finally { writeLock.unlock(); } - return null; - } - - private void refreshInstances() { - try { - activeInstances.refresh(); // handles its own sync - } catch (IOException ioe) { - LOG.warn("Could not refresh list of active instances", ioe); - } } private void addNode(ServiceInstance inst, NodeInfo node) { LOG.info("Adding node: " + inst); instanceToNodeMap.put(inst, node); - // TODO Trigger a scheduling run each time a new node is added. + // Trigger scheduling since a new node became available. + trySchedulingPendingTasks(); } private void reenableDisabledNode(NodeInfo nodeInfo) { writeLock.lock(); try { - if (!nodeInfo.isBusy()) { + if (nodeInfo.hadCommFailure()) { // If the node being re-enabled was not marked busy previously, then it was disabled due to // some other failure. Refresh the service list to see if it's been removed permanently. refreshInstances(); } - LOG.info("Attempting to re-enable node: " + nodeInfo.host); - if (nodeInfo.host.isAlive()) { + LOG.info("Attempting to re-enable node: " + nodeInfo.getServiceInstance()); + if (nodeInfo.getServiceInstance().isAlive()) { nodeInfo.enableNode(); } else { if (LOG.isInfoEnabled()) { @@ -555,7 +578,7 @@ private void reenableDisabledNode(NodeInfo nodeInfo) { } } - private void disableInstance(ServiceInstance instance, boolean busy) { + private void disableInstance(ServiceInstance instance, boolean isCommFailure) { writeLock.lock(); try { NodeInfo nodeInfo = instanceToNodeMap.get(instance); @@ -564,13 +587,9 @@ private void disableInstance(ServiceInstance instance, boolean busy) { LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); } } else { - nodeInfo.disableNode(nodeReEnableTimeout); - nodeInfo.setBusy(busy); // daemon failure vs daemon busy + nodeInfo.disableNode(isCommFailure); // TODO: handle task to container map events in case of hard failures disabledNodesQueue.add(nodeInfo); - if (LOG.isInfoEnabled()) { - LOG.info("Disabling instance " + instance + " for " + nodeReEnableTimeout + " milli-seconds"); - } } } finally { writeLock.unlock(); @@ -580,7 +599,6 @@ private void disableInstance(ServiceInstance instance, boolean busy) { private void addPendingTask(TaskInfo taskInfo) { writeLock.lock(); try { - dagStats.registerDelayedAllocation(); List tasksAtPriority = pendingTasks.get(taskInfo.priority); if (tasksAtPriority == null) { tasksAtPriority = new LinkedList<>(); @@ -607,7 +625,8 @@ private void removePendingTask(TaskInfo taskInfo) { } } - private void schedulePendingTasks() { + @VisibleForTesting + protected void schedulePendingTasks() { writeLock.lock(); try { Iterator>> pendingIterator = @@ -618,7 +637,15 @@ private void schedulePendingTasks() { Iterator taskIter = taskListAtPriority.iterator(); boolean scheduledAllAtPriority = true; while (taskIter.hasNext()) { + + // TODO Optimization: Add a check to see if there's any capacity available. No point in + // walking through all active nodes, if they don't have potential capacity. + TaskInfo taskInfo = taskIter.next(); + if (taskInfo.getNumPreviousAssignAttempts() == 1) { + dagStats.registerDelayedAllocation(); + } + taskInfo.triedAssigningTask(); boolean scheduled = scheduleTask(taskInfo); if (scheduled) { taskIter.remove(); @@ -642,20 +669,23 @@ private void schedulePendingTasks() { } } + private boolean scheduleTask(TaskInfo taskInfo) { - ServiceInstance host = selectHost(taskInfo); - if (host == null) { + NodeServiceInstancePair nsPair = selectHost(taskInfo); + if (nsPair == null) { return false; } else { Container container = - containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host.getHost(), - host.getRpcPort()); + containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, + nsPair.getServiceInstance().getHost(), + nsPair.getServiceInstance().getRpcPort()); writeLock.lock(); // While updating local structures try { dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, - host.getHost()); - taskInfo.setAssignmentInfo(host, container.getId()); + nsPair.getServiceInstance().getHost()); + taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId()); knownTasks.putIfAbsent(taskInfo.task, taskInfo); + nsPair.getNodeInfo().registerTaskScheduled(); } finally { writeLock.unlock(); } @@ -665,30 +695,92 @@ private boolean scheduleTask(TaskInfo taskInfo) { } } - private class PendingTaskSchedulerCallable implements Callable { + private class NodeEnablerCallable implements Callable { private AtomicBoolean isShutdown = new AtomicBoolean(false); + private static final long REFRESH_INTERVAL = 10000l; + long nextPollInterval = REFRESH_INTERVAL; + long lastRefreshTime; @Override public Void call() { + lastRefreshTime = System.currentTimeMillis(); while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { while (true) { - NodeInfo nodeInfo = disabledNodesQueue.take(); - // A node became available. Enable the node and try scheduling. - reenableDisabledNode(nodeInfo); - schedulePendingTasks(); + NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS); + if (nodeInfo != null) { + long currentTime = System.currentTimeMillis(); + // A node became available. Enable the node and try scheduling. + reenableDisabledNode(nodeInfo); + trySchedulingPendingTasks(); + + nextPollInterval -= (currentTime - lastRefreshTime); + } + + if (nextPollInterval < 0 || nodeInfo == null) { + // timeout expired. Reset the poll interval and refresh nodes. + nextPollInterval = REFRESH_INTERVAL; + lastRefreshTime = System.currentTimeMillis(); + // TODO Get rid of this polling once we have notificaitons from the registry sub-system + if (LOG.isDebugEnabled()) { + LOG.debug("Refreshing instances based on poll interval"); + } + refreshInstances(); + scanForNodeChanges(); + } + } + } catch (InterruptedException e) { + if (isShutdown.get()) { + LOG.info("NodeEnabler thread interrupted after shutdown"); + break; + } else { + LOG.warn("NodeEnabler thread interrupted without being shutdown"); + throw new RuntimeException("NodeEnabler thread interrupted without being shutdown", e); } + } + } + return null; + } + + // Call this first, then send in an interrupt to the thread. + public void shutdown() { + isShutdown.set(true); + } + } + + private void trySchedulingPendingTasks() { + scheduleLock.lock(); + try { + scheduleCondition.signal(); + } finally { + scheduleLock.unlock(); + } + } + + private class SchedulerCallable implements Callable { + private AtomicBoolean isShutdown = new AtomicBoolean(false); + + @Override + public Void call() { + while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { + scheduleLock.lock(); + try { + scheduleCondition.await(); } catch (InterruptedException e) { if (isShutdown.get()) { - LOG.info("Disabled node wait interrupted after shutdown. Stopping the disabled node poll"); + LOG.info("Scheduler thread interrupted after shutdown"); break; } else { - LOG.warn("Interrupted while waiting for disabled nodes."); - throw new RuntimeException("Interrupted while waiting for disabled nodes", e); + LOG.warn("Scheduler thread interrupted without being shutdown"); + throw new RuntimeException("Scheduler thread interrupted without being shutdown", e); } + } finally { + scheduleLock.unlock(); } + // Schedule outside of the scheduleLock - which should only be used to wait on the condition. + schedulePendingTasks(); } return null; } @@ -701,68 +793,126 @@ public void shutdown() { @VisibleForTesting static class NodeInfo implements Delayed { - private final float constBackOffFactor; - final ServiceInstance host; + private final NodeBlacklistConf blacklistConf; + final ServiceInstance serviceInstance; private final Clock clock; long expireTimeMillis = -1; private long numSuccessfulTasks = 0; private long numSuccessfulTasksAtLastBlacklist = -1; float cumulativeBackoffFactor = 1.0f; - // A node could be disabled for reasons other than being busy. - private boolean disabled = false; - // If disabled, the node could be marked as busy. - private boolean busy; + // Indicates whether a node had a recent communication failure. + private boolean hadCommFailure = false; - NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) { - this.host = host; - constBackOffFactor = backoffFactor; + // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc. + private boolean disabled = false; + + private int numScheduledTasks = 0; + private final int numSchedulableTasks; + + + /** + * Create a NodeInfo bound to a service instance + * + * @param serviceInstance the associated serviceInstance + * @param blacklistConf blacklist configuration + * @param clock clock to use to obtain timing information + * @param numSchedulableTasksConf number of schedulable tasks on the node. 0 represents auto + * detect based on the serviceInstance, -1 indicates indicates + * unlimited capacity + */ + NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf) { + Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1"); + this.serviceInstance = serviceInstance; + this.blacklistConf = blacklistConf; this.clock = clock; + + if (numSchedulableTasksConf == 0) { + int pendingQueueuCapacity = 0; + String pendingQueueCapacityString = serviceInstance.getProperties() + .get(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting up node: " + serviceInstance + ", with available capacity=" + + serviceInstance.getResource().getVirtualCores() + ", pendingQueueCapacity=" + + pendingQueueCapacityString); + } + + if (pendingQueueCapacityString != null) { + pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString); + } + this.numSchedulableTasks = serviceInstance.getResource().getVirtualCores() + pendingQueueuCapacity; + } else { + this.numSchedulableTasks = numSchedulableTasksConf; + } + LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); + } + + ServiceInstance getServiceInstance() { + return serviceInstance; } void enableNode() { expireTimeMillis = -1; disabled = false; + hadCommFailure = false; } - void disableNode(long duration) { + void disableNode(boolean commFailure) { + long duration = blacklistConf.minDelay; long currentTime = clock.getTime(); + this.hadCommFailure = commFailure; disabled = true; if (numSuccessfulTasksAtLastBlacklist == numSuccessfulTasks) { - // Blacklisted again, without any progress. Will never kick in for the first run. - cumulativeBackoffFactor = cumulativeBackoffFactor * constBackOffFactor; + // Relying on a task succeeding to reset the exponent. + // There's no notifications on whether a task gets accepted or not. That would be ideal to + // reset this. + cumulativeBackoffFactor = cumulativeBackoffFactor * blacklistConf.backoffFactor; } else { // Was able to execute something before the last blacklist. Reset the exponent. cumulativeBackoffFactor = 1.0f; } - expireTimeMillis = currentTime + (long) (duration * cumulativeBackoffFactor); + + long delayTime = (long) (duration * cumulativeBackoffFactor); + if (delayTime > blacklistConf.maxDelay) { + delayTime = blacklistConf.maxDelay; + } + if (LOG.isInfoEnabled()) { + LOG.info("Disabling instance " + serviceInstance + " for " + delayTime + " milli-seconds"); + } + expireTimeMillis = currentTime + delayTime; numSuccessfulTasksAtLastBlacklist = numSuccessfulTasks; + } + void registerTaskScheduled() { + numScheduledTasks++; } void registerTaskSuccess() { - // TODO If a task succeeds, we may have free slots. Mark the node as !busy. Ideally take it out - // of the queue for more allocations. - // For now, not chanigng the busy status, - - // this.busy = false; - // this.disabled = false; numSuccessfulTasks++; + numScheduledTasks--; } - public void setBusy(boolean busy) { - this.busy = busy; - } - - public boolean isBusy() { - return busy; + void registerUnsuccessfulTaskEnd() { + numScheduledTasks--; } public boolean isDisabled() { return disabled; } + public boolean hadCommFailure() { + return hadCommFailure; + } + /* Returning true does not guarantee that the task will run, considering other queries + may be running in the system. Also depends upon the capacity usage configuration + */ + public boolean canAcceptTask() { + boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive() + &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); + return result; + } + @Override public long getDelay(TimeUnit unit) { return unit.convert(expireTimeMillis - clock.getTime(), TimeUnit.MILLISECONDS); @@ -782,10 +932,15 @@ public int compareTo(Delayed o) { @Override public String toString() { - return "NodeInfo{" + "constBackOffFactor=" + constBackOffFactor + ", host=" + host + return "NodeInfo{" + "instance=" + serviceInstance + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" + numSuccessfulTasks + ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist - + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor + '}'; + + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor + + ", numSchedulableTasks=" + numSchedulableTasks + + ", numScheduledTasks=" + numScheduledTasks + + ", disabled=" + disabled + + ", commFailures=" + hadCommFailure + +'}'; } } @@ -880,6 +1035,7 @@ private void _registerAllocationInHostMap(String host, Map