diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fabb8ab..c99c4df 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2719,7 +2719,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true), "Amount of time to wait before allocating a request which contains location information," + " to a location other than the ones requested. Set to -1 for an infinite delay, 0" + - "for a no delay. Currently these are the only two supported values" + "for no delay." ), LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size", 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"), diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java index a314391..f1feec7 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java @@ -37,11 +37,10 @@ public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) { } public Container createContainer(Resource capability, Priority priority, String hostname, - int port) { + int port, String nodeHttpAddress) { ContainerId containerId = ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement()); NodeId nodeId = NodeId.newInstance(hostname, port); - String nodeHttpAddress = "hostname:0"; // TODO: include UI ports Container container = Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null); diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index b57ae1a..af92a61 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -48,7 +48,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; @@ -58,6 +57,8 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; +import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -71,7 +72,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -84,6 +84,8 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); + private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); + private final Configuration conf; // interface into the registry service @@ -97,6 +99,9 @@ // Tracks tasks which could not be allocated immediately. @VisibleForTesting + // Tasks are tracked in the order requests come in, at different priority levels. + // TODO For tasks at the same priority level, it may be worth attempting to schedule tasks with + // locality information before those without locality information final TreeMap> pendingTasks = new TreeMap<>(new Comparator() { @Override public int compare(Priority o1, Priority o2) { @@ -106,23 +111,30 @@ public int compare(Priority o1, Priority o2) { // Tracks running and queued tasks. Cleared after a task completes. private final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); + // Tracks tasks which are running. Useful for selecting a task to preempt based on when it started. private final TreeMap> runningTasks = new TreeMap<>(); - private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); + // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit. @VisibleForTesting final DelayQueue disabledNodesQueue = new DelayQueue<>(); + @VisibleForTesting + final DelayQueue delayedTaskQueue = new DelayQueue<>(); - private final boolean forceLocation; private final ContainerFactory containerFactory; private final Random random = new Random(); - private final Clock clock; + @VisibleForTesting + final Clock clock; private final ListeningExecutorService nodeEnabledExecutor; private final NodeEnablerCallable nodeEnablerCallable = new NodeEnablerCallable(); + private final ListeningExecutorService delayedTaskSchedulerExecutor; + @VisibleForTesting + final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); @@ -140,6 +152,7 @@ public int compare(Priority o1, Priority o2) { private final Map pendingPreemptionsPerHost = new HashMap<>(); private final NodeBlacklistConf nodeBlacklistConf; + private final LocalityDelayConf localityDelayConf; // Per daemon private final int memoryPerInstance; @@ -154,6 +167,7 @@ public int compare(Priority o1, Priority o2) { private final LlapRegistryService registry = new LlapRegistryService(false); private volatile ListenableFuture nodeEnablerFuture; + private volatile ListenableFuture delayedTaskSchedulerFuture; private volatile ListenableFuture schedulerFuture; @VisibleForTesting @@ -164,13 +178,14 @@ public int compare(Priority o1, Priority o2) { StatsPerDag dagStats = new StatsPerDag(); public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { - this(taskSchedulerContext, new SystemClock()); + this(taskSchedulerContext, new MonotonicClock()); } @VisibleForTesting public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) { super(taskSchedulerContext); this.clock = clock; + this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable(); try { this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); } catch (IOException e) { @@ -179,6 +194,9 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock } this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), taskSchedulerContext.getCustomClusterIdentifier()); + // TODO Get all of these properties from the registry. This will need to take care of different instances + // publishing potentially different values when we support changing configurations dynamically. + // For now, this can simply be fetched from a single registry instance. this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB); this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE); this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); @@ -194,11 +212,8 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock long localityDelayMs = HiveConf .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS); - if (localityDelayMs == -1) { - this.forceLocation = true; - } else { - this.forceLocation = false; - } + + this.localityDelayConf = new LocalityDelayConf(localityDelayMs); int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); @@ -214,6 +229,13 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build()); nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw); + + ExecutorService delayedTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerDelayedTaskHandler") + .build()); + delayedTaskSchedulerExecutor = + MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw); + ExecutorService schedulerExecutorServiceRaw = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw); @@ -222,7 +244,7 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor + ", nodeBlacklistConf=" + nodeBlacklistConf - + ", forceLocation=" + forceLocation); + + ", localityDelayMs=" + localityDelayMs); } @Override @@ -235,29 +257,16 @@ public void start() throws IOException { writeLock.lock(); try { nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); - Futures.addCallback(nodeEnablerFuture, new FutureCallback() { - @Override - public void onSuccess(Void result) { - LOG.info("NodeEnabledThread exited"); - } + Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG)); + + delayedTaskSchedulerFuture = + delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable); + Futures.addCallback(delayedTaskSchedulerFuture, + new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG)); - @Override - public void onFailure(Throwable t) { - LOG.warn("NodeEnabledThread exited with error", t); - } - }); schedulerFuture = schedulerExecutor.submit(schedulerCallable); - Futures.addCallback(schedulerFuture, new FutureCallback() { - @Override - public void onSuccess(Void result) { - LOG.info("SchedulerThread exited"); - } + Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG)); - @Override - public void onFailure(Throwable t) { - LOG.warn("SchedulerThread exited with error", t); - } - }); registry.start(); registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); @@ -305,6 +314,12 @@ public void shutdown() { } nodeEnabledExecutor.shutdownNow(); + delayedTaskSchedulerCallable.shutdown(); + if (delayedTaskSchedulerFuture != null) { + delayedTaskSchedulerFuture.cancel(true); + } + delayedTaskSchedulerExecutor.shutdownNow(); + schedulerCallable.shutdown(); if (schedulerFuture != null) { schedulerFuture.cancel(true); @@ -396,6 +411,7 @@ public void dagComplete() { @Override public void blacklistNode(NodeId nodeId) { LOG.info("BlacklistNode not supported"); + // TODO What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted. } @Override @@ -407,7 +423,7 @@ public void unblacklistNode(NodeId nodeId) { public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, Priority priority, Object containerSignature, Object clientCookie) { TaskInfo taskInfo = - new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime()); + new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, hosts, racks, clock.getTime()); writeLock.lock(); try { dagStats.registerTaskRequest(hosts, racks); @@ -424,7 +440,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container // Container affinity can be implemented as Host affinity for LLAP. Not required until // 1:1 edges are used in Hive. TaskInfo taskInfo = - new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime()); + new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, null, null, clock.getTime()); writeLock.lock(); try { dagStats.registerTaskRequest(null, null); @@ -452,7 +468,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd return false; } if (taskInfo.containerId == null) { - if (taskInfo.assigned) { + if (taskInfo.getState() == TaskInfo.State.ASSIGNED) { LOG.error("Task: " + task + " assigned, but could not find the corresponding containerId." @@ -471,7 +487,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd assert nodeInfo != null; // Re-enable the node if preempted - if (taskInfo.preempted) { + if (taskInfo.getState() == TaskInfo.State.PREEMPTED) { LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); unregisterPendingPreemption(taskInfo.assignedInstance.getHost()); nodeInfo.registerUnsuccessfulTaskEnd(true); @@ -505,7 +521,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); - } else if (!taskSucceeded) { + } else { // Task Failed nodeInfo.registerUnsuccessfulTaskEnd(false); if (endReason != null && EnumSet .of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) @@ -554,17 +570,13 @@ public boolean hasUnregistered() { return true; } - private ExecutorService createAppCallbackExecutorService() { - return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); - } - /** * @param request the list of preferred hosts. null implies any host * @return */ private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; + long schedulerAttemptTime = clock.getTime(); readLock.lock(); // Read-lock. Not updating any stats at the moment. try { // Check if any hosts are active. @@ -579,32 +591,61 @@ private SelectHostResult selectHost(TaskInfo request) { return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; } + boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime); if (requestedHosts != null && requestedHosts.length > 0) { int prefHostCount = -1; - boolean requestedHostExists = false; + boolean requestedHostsWillBecomeAvailable = false; for (String host : requestedHosts) { prefHostCount++; // Pick the first host always. Weak attempt at cache affinity. Set instances = activeInstances.getByHost(host); if (!instances.isEmpty()) { - requestedHostExists = true; for (ServiceInstance inst : instances) { NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); - if (nodeInfo != null && nodeInfo.canAcceptTask()) { - LOG.info("Assigning " + inst + " when looking for " + host + "." + - " FirstRequestedHost=" + (prefHostCount == 0) + - (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : "")); - return new SelectHostResult(inst, nodeInfo); + if (nodeInfo != null) { + if (nodeInfo.canAcceptTask()) { + // Successfully scheduled. + LOG.info( + "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host + + ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) + + (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length : + "")); + return new SelectHostResult(inst, nodeInfo); + } else { + // The node cannot accept a task at the moment. + if (shouldDelayForLocality) { + // Perform some checks on whether the node will become available or not. + if (request.shouldForceLocality()) { + requestedHostsWillBecomeAvailable = true; + } else { + if (nodeInfo.getEnableTime() > request.getLocalityDelayTimeout() && + nodeInfo.isDisabled() && nodeInfo.hadCommFailure()) { + // This node will likely be activated after the task timeout expires. + } else { + // Worth waiting for the timeout. + requestedHostsWillBecomeAvailable = true; + } + } + } + } + } else { + LOG.warn( + "Null NodeInfo when attempting to get host with worker identity {}, and host {}", + inst.getWorkerIdentity(), host); + // Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay, + // else ends up allocating to a random host immediately. } } } } // Check if forcing the location is required. - if (forceLocation) { - if (requestedHostExists) { + if (shouldDelayForLocality) { + if (requestedHostsWillBecomeAvailable) { if (LOG.isDebugEnabled()) { LOG.debug("Skipping non-local location allocation for [" + request.task + - "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]"); + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]" + + ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" + + request.getLocalityDelayTimeout()); } return SELECT_HOST_RESULT_DELAYED_LOCALITY; } else { @@ -625,10 +666,11 @@ private SelectHostResult selectHost(TaskInfo request) { for (int i = 0; i < all.length; i++) { Entry inst = all[(i + n) % all.length]; if (inst.getValue().canAcceptTask()) { - LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length + - ", requestedHosts=" + - ((requestedHosts == null || requestedHosts.length == 0) ? "null" : - Arrays.toString(requestedHosts))); + LOG.info( + "Assigning " + nodeToString(inst.getValue().getServiceInstance(), inst.getValue()) + + " when looking for any host, from #hosts=" + all.length + ", requestedHosts=" + + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : + Arrays.toString(requestedHosts))); return new SelectHostResult(inst.getValue().getServiceInstance(), inst.getValue()); } } @@ -704,6 +746,7 @@ private void addPendingTask(TaskInfo taskInfo) { tasksAtPriority = new LinkedList<>(); pendingTasks.put(taskInfo.priority, tasksAtPriority); } + // Delayed tasks will not kick in right now. That will happen in the scheduling loop. tasksAtPriority.add(taskInfo); knownTasks.putIfAbsent(taskInfo.task, taskInfo); } finally { @@ -748,7 +791,7 @@ private TaskInfo unregisterTask(Object task) { try { TaskInfo taskInfo = knownTasks.remove(task); if (taskInfo != null) { - if (taskInfo.assigned) { + if (taskInfo.getState() == TaskInfo.State.ASSIGNED) { // Remove from the running list. int priority = taskInfo.priority.getPriority(); Set tasksAtPriority = runningTasks.get(priority); @@ -804,6 +847,9 @@ protected void schedulePendingTasks() { } taskInfo.triedAssigningTask(); ScheduleResult scheduleResult = scheduleTask(taskInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult); + } if (scheduleResult == ScheduleResult.SCHEDULED) { taskIter.remove(); } else { @@ -814,6 +860,11 @@ protected void schedulePendingTasks() { // Preempt only if there's no pending preemptions to avoid preempting twice for a task. String[] potentialHosts; if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) { + + // Add the task to the delayed task queue if it does not already exist. + maybeAddToDelayedTaskQueue(taskInfo); + + // Try preempting a lower priority task in any case. // preempt only on specific hosts, if no preemptions already exist on those. potentialHosts = taskInfo.requestedHosts; //Protect against a bad location being requested. @@ -884,7 +935,8 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo) { Container container = containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, nsPair.getServiceInstance().getHost(), - nsPair.getServiceInstance().getRpcPort()); + nsPair.getServiceInstance().getRpcPort(), + nsPair.getServiceInstance().getServicesAddress()); writeLock.lock(); // While updating local structures try { LOG.info("Assigned task {} to container {}", taskInfo, container.getId()); @@ -995,9 +1047,82 @@ private void unregisterPendingPreemption(String host) { } } + private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) { + // There's no point adding a task with forceLocality set - since that will never exit the queue. + // Add other tasks if they are not already in the queue. + if (!taskInfo.shouldForceLocality() && !taskInfo.isInDelayedQueue()) { + taskInfo.setInDelayedQueue(true); + delayedTaskQueue.add(taskInfo); + } + } + + private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) { + return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" + + serviceInstance.getWorkerIdentity() + ", webAddress=" + + serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks; + } + + + + // ------ Inner classes defined after this point ------ + + @VisibleForTesting + class DelayedTaskSchedulerCallable implements Callable { + + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + @Override + public Void call() { + while (true) { + try { + TaskInfo taskInfo = getNextTask(); + taskInfo.setInDelayedQueue(false); + // Tasks can exist in the delayed queue even after they have been scheduled. + // Trigger scheduling only if the task is still in PENDING state. + processEvictedTask(taskInfo); + + } catch (InterruptedException e) { + if (isShutdown.get()) { + LOG.info("DelayedTaskScheduler thread interrupted after shutdown"); + break; + } else { + LOG.warn("DelayedTaskScheduler thread interrupted before being shutdown"); + throw new RuntimeException( + "DelayedTaskScheduler thread interrupted without being shutdown", e); + } + } + } + return null; + } + + public void shutdown() { + isShutdown.set(true); + } + + public TaskInfo getNextTask() throws InterruptedException { + TaskInfo taskInfo = delayedTaskQueue.take(); + return taskInfo; + } + + public void processEvictedTask(TaskInfo taskInfo) { + if (shouldScheduleTask(taskInfo)) { + trySchedulingPendingTasks(); + } + } + + public boolean shouldScheduleTask(TaskInfo taskInfo) { + return taskInfo.getState() == TaskInfo.State.PENDING; + } + } + + @VisibleForTesting + DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() { + return new DelayedTaskSchedulerCallable(); + } + private class NodeEnablerCallable implements Callable { - private AtomicBoolean isShutdown = new AtomicBoolean(false); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); private static final long REFRESH_INTERVAL = 10000l; long nextPollInterval = REFRESH_INTERVAL; long lastRefreshTime; @@ -1005,13 +1130,13 @@ private void unregisterPendingPreemption(String host) { @Override public Void call() { - lastRefreshTime = System.currentTimeMillis(); + lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime(); while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { while (true) { NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS); if (nodeInfo != null) { - long currentTime = System.currentTimeMillis(); + long currentTime = LlapTaskSchedulerService.this.clock.getTime(); // A node became available. Enable the node and try scheduling. reenableDisabledNode(nodeInfo); trySchedulingPendingTasks(); @@ -1022,7 +1147,7 @@ public Void call() { if (nextPollInterval < 0 || nodeInfo == null) { // timeout expired. Reset the poll interval and refresh nodes. nextPollInterval = REFRESH_INTERVAL; - lastRefreshTime = System.currentTimeMillis(); + lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime(); // TODO Get rid of this polling once we have notificaitons from the registry sub-system if (LOG.isDebugEnabled()) { LOG.debug("Refreshing instances based on poll interval"); @@ -1086,6 +1211,8 @@ public Void call() { // will be handled in the next run. // A new request may come in right after this is set to false, but before the actual scheduling. // This will be handled in this run, but will cause an immediate run after, which is harmless. + // This is mainly to handle a trySchedue request while in the middle of a run - since the event + // which triggered it may not be processed for all tasks in the run. pendingScheduleInvodations.set(false); // Schedule outside of the scheduleLock - which should only be used to wait on the condition. schedulePendingTasks(); @@ -1099,6 +1226,8 @@ public void shutdown() { } } + // ------ Additional static classes defined after this point ------ + @VisibleForTesting static class NodeInfo implements Delayed { private final NodeBlacklistConf blacklistConf; @@ -1111,6 +1240,8 @@ public void shutdown() { float cumulativeBackoffFactor = 1.0f; // Indicates whether a node had a recent communication failure. + // This is primarily for tracking and logging purposes for the moment. + // TODO At some point, treat task rejection and communication failures differently. private boolean hadCommFailure = false; // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc. @@ -1209,6 +1340,13 @@ void registerUnsuccessfulTaskEnd(boolean wasPreempted) { } } + /** + * @return the time at which this node will be re-enabled + */ + public long getEnableTime() { + return expireTimeMillis; + } + public boolean isDisabled() { return disabled; } @@ -1216,13 +1354,20 @@ public boolean isDisabled() { public boolean hadCommFailure() { return hadCommFailure; } + /* Returning true does not guarantee that the task will run, considering other queries may be running in the system. Also depends upon the capacity usage configuration */ public boolean canAcceptTask() { boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive() &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0)); - LOG.info("canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, serviceInstance.isAlive()); + if (LOG.isInfoEnabled()) { + LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " + + serviceInstance.getWorkerIdentity() + "]: " + + "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", + result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, + serviceInstance.isAlive()); + } return result; } @@ -1346,11 +1491,23 @@ private void _registerAllocationInHostMap(String host, Map schedulerAttemptTime; + } + + boolean shouldForceLocality() { + return localityDelayTimeout == Long.MAX_VALUE; + } + + long getLocalityDelayTimeout() { + return localityDelayTimeout; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -1436,8 +1627,26 @@ public String toString() { ", containerId=" + containerId + ", assignedInstance=" + assignedInstance + ", uniqueId=" + uniqueId + + ", localityDelayTimeout=" + localityDelayTimeout + '}'; } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(localityDelayTimeout - clock.getTime(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + TaskInfo other = (TaskInfo) o; + if (other.localityDelayTimeout > this.localityDelayTimeout) { + return -1; + } else if (other.localityDelayTimeout < this.localityDelayTimeout) { + return 1; + } else { + return 0; + } + } } // Newer tasks first. @@ -1523,4 +1732,24 @@ public String toString() { '}'; } } + + @VisibleForTesting + static final class LocalityDelayConf { + private final long nodeLocalityDelay; + + public LocalityDelayConf(long nodeLocalityDelay) { + this.nodeLocalityDelay = nodeLocalityDelay; + } + + public long getNodeLocalityDelay() { + return nodeLocalityDelay; + } + + @Override + public String toString() { + return "LocalityDelayConf{" + + "nodeLocalityDelay=" + nodeLocalityDelay + + '}'; + } + } } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java new file mode 100644 index 0000000..aaa6f91 --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.helpers; + +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.util.Clock; + +public class MonotonicClock implements Clock { + @Override + public long getTime() { + return Time.monotonicNow(); + } +} diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java new file mode 100644 index 0000000..ea700da --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.scheduler; + +import java.util.concurrent.CancellationException; + +import com.google.common.util.concurrent.FutureCallback; +import org.slf4j.Logger; + +public final class LoggingFutureCallback implements FutureCallback { + private final String componentName; + private final Logger LOG; + + public LoggingFutureCallback(String componentName, Logger log) { + this.componentName = componentName; + LOG = log; + } + + @Override + public void onSuccess(Void result) { + LOG.info("{} exited", componentName); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof CancellationException) { + LOG.info("{} was cancelled", componentName, t.getMessage()); + } else { + LOG.warn("{} exited with error", componentName, t); + } + } +} diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 36d8ffd..07aa863 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -15,7 +15,9 @@ package org.apache.hadoop.hive.llap.tezplugins; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; @@ -26,6 +28,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +40,8 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; +import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -44,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -62,7 +66,7 @@ private static final String HOST2 = "host2"; private static final String HOST3 = "host3"; - @Test (timeout = 5000) + @Test(timeout = 10000) public void testSimpleLocalAllocation() throws IOException, InterruptedException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); @@ -77,18 +81,17 @@ public void testSimpleLocalAllocation() throws IOException, InterruptedException tsWrapper.controlScheduler(true); tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); + tsWrapper.awaitLocalTaskAllocations(1); verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); - // TODO Verify this is on host1. assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get()); } finally { tsWrapper.shutdown(); } } - @Test (timeout = 5000) + @Test(timeout = 10000) public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); @@ -99,8 +102,7 @@ public void testSimpleNoLocalityAllocation() throws IOException, InterruptedExce Object clientCookie1 = new Object(); tsWrapper.controlScheduler(true); tsWrapper.allocateTask(task1, null, priority1, clientCookie1); - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); + tsWrapper.awaitTotalTaskAllocations(1); verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class)); assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); } finally { @@ -109,7 +111,7 @@ public void testSimpleNoLocalityAllocation() throws IOException, InterruptedExce } - @Test(timeout=5000) + @Test(timeout = 10000) public void testPreemption() throws InterruptedException, IOException { Priority priority1 = Priority.newInstance(1); @@ -174,7 +176,7 @@ public void testPreemption() throws InterruptedException, IOException { } - @Test(timeout=5000) + @Test(timeout = 10000) public void testNodeDisabled() throws IOException, InterruptedException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l); try { @@ -233,7 +235,7 @@ public void testNodeDisabled() throws IOException, InterruptedException { } } - @Test(timeout=5000) + @Test(timeout = 10000) public void testNodeReEnabled() throws InterruptedException, IOException { // Based on actual timing. TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l); @@ -307,7 +309,7 @@ public void testNodeReEnabled() throws InterruptedException, IOException { } } - @Test (timeout = 5000) + @Test(timeout = 10000) public void testForceLocalityTest1() throws IOException, InterruptedException { // 2 hosts. 2 per host. 5 requests at the same priority. // First 3 on host1, Next at host2, Last with no host. @@ -316,7 +318,7 @@ public void testForceLocalityTest1() throws IOException, InterruptedException { } - @Test (timeout = 5000) + @Test(timeout = 10000) public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException { // 2 hosts. 2 per host. 5 requests at the same priority. // First 3 on host1, Next at host2, Last with no host. @@ -411,7 +413,7 @@ private void forceLocalityTest1(boolean forceLocality) throws IOException, Inter } } - @Test(timeout = 5000) + @Test(timeout = 10000) public void testForcedLocalityUnknownHost() throws IOException, InterruptedException { Priority priority1 = Priority.newInstance(1); @@ -454,15 +456,13 @@ public void testForcedLocalityUnknownHost() throws IOException, InterruptedExcep } } - - @Test(timeout = 5000) + @Test(timeout = 10000) public void testForcedLocalityPreemption() throws IOException, InterruptedException { Priority priority1 = Priority.newInstance(1); Priority priority2 = Priority.newInstance(2); String [] hosts = new String[] {HOST1, HOST2}; String [] hostsH1 = new String[] {HOST1}; - String [] hostsH2 = new String[] {HOST2}; TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); @@ -485,13 +485,7 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2); // This request at a lower priority should not affect anything. tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { - break; - } - } + tsWrapper.awaitLocalTaskAllocations(2); verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); @@ -517,13 +511,8 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); - while (true) { - tsWrapper.signalSchedulerRun(); - tsWrapper.awaitSchedulerRun(); - if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { - break; - } - } + tsWrapper.awaitLocalTaskAllocations(3); + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), eq(clientCookie4), any(Container.class)); @@ -532,11 +521,471 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept } } + @Test(timeout = 10000) + public void testForcedLocalityNotInDelayedQueue() throws IOException, InterruptedException { + String[] hosts = new String[]{HOST1, HOST2}; + + String[] hostsH1 = new String[]{HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); + testNotInQueue(tsWrapper, hostsH1); + } + + @Test(timeout = 10000) + public void testNoLocalityNotInDelayedQueue() throws IOException, InterruptedException { + String[] hosts = new String[]{HOST1}; + + String[] hostsH1 = new String[]{HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 0l); + testNotInQueue(tsWrapper, hostsH1); + } + + private void testNotInQueue(TestTaskSchedulerServiceWrapper tsWrapper, String[] hosts) throws + InterruptedException { + Priority priority1 = Priority.newInstance(1); + try { + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(hosts, priority1); + tsWrapper.allocateTask(hosts, priority1); + tsWrapper.allocateTask(hosts, priority1); // 1 more than capacity. + + tsWrapper.awaitLocalTaskAllocations(2); + + assertEquals(0, tsWrapper.ts.delayedTaskQueue.size()); + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) + public void testDelayedLocalityFallbackToNonLocal() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true); + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled + delayedTaskSchedulerCallableControlled = + (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable; + ControlledClock clock = tsWrapper.getClock(); + clock.setTime(clock.getTime()); + + // Fill up host1 with tasks. Leave host2 empty. + try { + tsWrapper.controlScheduler(true); + Object task1 = tsWrapper.allocateTask(hostsH1, priority1); + Object task2 = tsWrapper.allocateTask(hostsH1, priority1); + Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity. + + tsWrapper.awaitLocalTaskAllocations(2); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + reset(tsWrapper.mockAppCallback); + + // No capacity left on node1. The next task should be allocated to node2 after it times out. + clock.setTime(clock.getTime() + 10000l); // Past the timeout. + + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN, + delayedTaskSchedulerCallableControlled.lastState); + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK, + delayedTaskSchedulerCallableControlled.lastState); + assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered && + delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult); + + tsWrapper.awaitChangeInTotalAllocations(2); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + Container assignedContainer = containerCaptor.getValue(); + assertEquals(HOST2, assignedContainer.getNodeId().getHost()); + + + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations); + assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get()); + assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get()); + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) + public void testDelayedLocalityDelayedAllocation() throws InterruptedException, IOException { + Priority priority1 = Priority.newInstance(1); + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true); + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled + delayedTaskSchedulerCallableControlled = + (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable; + ControlledClock clock = tsWrapper.getClock(); + clock.setTime(clock.getTime()); + + // Fill up host1 with tasks. Leave host2 empty. + try { + tsWrapper.controlScheduler(true); + Object task1 = tsWrapper.allocateTask(hostsH1, priority1); + Object task2 = tsWrapper.allocateTask(hostsH1, priority1); + Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity. + + tsWrapper.awaitLocalTaskAllocations(2); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + reset(tsWrapper.mockAppCallback); + + // Move the clock forward 2000ms, and check the delayed queue + clock.setTime(clock.getTime() + 2000l); // Past the timeout. + + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN, + delayedTaskSchedulerCallableControlled.lastState); + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + + // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED, + delayedTaskSchedulerCallableControlled.lastState); + assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered); + + tsWrapper.deallocateTask(task1, true, null); + + // Node1 now has free capacity. task1 should be allocated to it. + tsWrapper.awaitChangeInTotalAllocations(2); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + Container assignedContainer = containerCaptor.getValue(); + assertEquals(HOST1, assignedContainer.getNodeId().getHost()); + + + assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations); + assertEquals(3, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get()); + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) + public void testDelayedQueeTaskSelectionAfterScheduled() throws IOException, + InterruptedException { + Priority priority1 = Priority.newInstance(1); + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true); + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled + delayedTaskSchedulerCallableControlled = + (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable; + ControlledClock clock = tsWrapper.getClock(); + clock.setTime(clock.getTime()); + + // Fill up host1 with tasks. Leave host2 empty. + try { + tsWrapper.controlScheduler(true); + Object task1 = tsWrapper.allocateTask(hostsH1, priority1); + Object task2 = tsWrapper.allocateTask(hostsH1, priority1); + Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity. + + tsWrapper.awaitLocalTaskAllocations(2); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + // Simulate a 2s delay before finishing the task. + clock.setTime(clock.getTime() + 2000); + + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN, + delayedTaskSchedulerCallableControlled.lastState); + + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED, + delayedTaskSchedulerCallableControlled.lastState); + assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered); + + reset(tsWrapper.mockAppCallback); + + // Now finish task1, which will make capacity for task3 to run. Nothing is coming out of the delayed queue yet. + tsWrapper.deallocateTask(task1, true, null); + tsWrapper.awaitLocalTaskAllocations(3); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + Container assignedContainer = containerCaptor.getValue(); + assertEquals(HOST1, assignedContainer.getNodeId().getHost()); + + reset(tsWrapper.mockAppCallback); + + // Move the clock forward and trigger a run. + clock.setTime(clock.getTime() + 8000); // Set to start + 10000 which is the timeout + delayedTaskSchedulerCallableControlled.triggerGetNextTask(); + delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing(); + assertEquals( + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK, + delayedTaskSchedulerCallableControlled.lastState); + // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling + assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered && + !delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult); + + // Ensure there's no more invocations. + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + verify(tsWrapper.mockAppCallback, never()).taskAllocated(any(Object.class), any(Object.class), any(Container.class)); + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) + public void testTaskInfoDelay() { + + LlapTaskSchedulerService.LocalityDelayConf localityDelayConf1 = + new LlapTaskSchedulerService.LocalityDelayConf(3000); + + ControlledClock clock = new ControlledClock(new MonotonicClock()); + clock.setTime(clock.getTime()); + + + // With a timeout of 3000. + LlapTaskSchedulerService.TaskInfo taskInfo = + new LlapTaskSchedulerService.TaskInfo(localityDelayConf1, clock, new Object(), new Object(), + mock(Priority.class), mock(Resource.class), null, null, clock.getTime()); + + assertFalse(taskInfo.shouldForceLocality()); + + assertEquals(3000, taskInfo.getDelay(TimeUnit.MILLISECONDS)); + assertTrue(taskInfo.shouldDelayForLocality(clock.getTime())); + + clock.setTime(clock.getTime() + 500); + assertEquals(2500, taskInfo.getDelay(TimeUnit.MILLISECONDS)); + assertTrue(taskInfo.shouldDelayForLocality(clock.getTime())); + + clock.setTime(clock.getTime() + 2500); + assertEquals(0, taskInfo.getDelay(TimeUnit.MILLISECONDS)); + assertFalse(taskInfo.shouldDelayForLocality(clock.getTime())); + + + // No locality delay + LlapTaskSchedulerService.LocalityDelayConf localityDelayConf2 = + new LlapTaskSchedulerService.LocalityDelayConf(0); + taskInfo = + new LlapTaskSchedulerService.TaskInfo(localityDelayConf2, clock, new Object(), new Object(), + mock(Priority.class), mock(Resource.class), null, null, clock.getTime()); + assertFalse(taskInfo.shouldDelayForLocality(clock.getTime())); + assertFalse(taskInfo.shouldForceLocality()); + assertTrue(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0); + + // Force locality + LlapTaskSchedulerService.LocalityDelayConf localityDelayConf3 = + new LlapTaskSchedulerService.LocalityDelayConf(-1); + taskInfo = + new LlapTaskSchedulerService.TaskInfo(localityDelayConf3, clock, new Object(), new Object(), + mock(Priority.class), mock(Resource.class), null, null, clock.getTime()); + assertTrue(taskInfo.shouldDelayForLocality(clock.getTime())); + assertTrue(taskInfo.shouldForceLocality()); + assertFalse(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0); + } + + @Test(timeout = 10000) + public void testLocalityDelayTaskOrdering() throws InterruptedException, IOException { + + LlapTaskSchedulerService.LocalityDelayConf localityDelayConf = + new LlapTaskSchedulerService.LocalityDelayConf(3000); + + ControlledClock clock = new ControlledClock(new MonotonicClock()); + clock.setTime(clock.getTime()); + + DelayQueue delayedQueue = new DelayQueue<>(); + + LlapTaskSchedulerService.TaskInfo taskInfo1 = + new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(), + mock(Priority.class), mock(Resource.class), null, null, clock.getTime()); + + clock.setTime(clock.getTime() + 1000); + LlapTaskSchedulerService.TaskInfo taskInfo2 = + new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(), + mock(Priority.class), mock(Resource.class), null, null, clock.getTime()); + + delayedQueue.add(taskInfo1); + delayedQueue.add(taskInfo2); + + assertEquals(taskInfo1, delayedQueue.peek()); + } + + @Test (timeout = 15000) + public void testDelayedLocalityNodeCommErrorImmediateAllocation() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + // Node disable timeout higher than locality delay. + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(20000, hosts, 1, 1, 10000l); + + // Fill up host1 with tasks. Leave host2 empty. + try { + long startTime = tsWrapper.getClock().getTime(); + tsWrapper.controlScheduler(true); + Object task1 = tsWrapper.allocateTask(hostsH1, priority1); + Object task2 = tsWrapper.allocateTask(hostsH1, priority1); + Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity. + + tsWrapper.awaitLocalTaskAllocations(2); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + reset(tsWrapper.mockAppCallback); + + // Mark a task as failed due to a comm failure. + tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR); + + // Node1 marked as failed, node2 has capacity. + // Timeout for nodes is larger than delay - immediate allocation + tsWrapper.awaitChangeInTotalAllocations(2); + + long thirdAllocateTime = tsWrapper.getClock().getTime(); + long diff = thirdAllocateTime - startTime; + // diffAfterSleep < total sleepTime + assertTrue("Task not allocated in expected time window: duration=" + diff, diff < 10000l); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(1)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + Container assignedContainer = containerCaptor.getValue(); + assertEquals(HOST2, assignedContainer.getNodeId().getHost()); + + + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations); + assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get()); + assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get()); + + } finally { + tsWrapper.shutdown(); + } + } + + @Test (timeout = 15000) + public void testDelayedLocalityNodeCommErrorDelayedAllocation() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(5000, hosts, 1, 1, 10000l, true); + LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled + delayedTaskSchedulerCallableControlled = + (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable; + ControlledClock clock = tsWrapper.getClock(); + clock.setTime(clock.getTime()); + + // Fill up host1 with tasks. Leave host2 empty. + try { + tsWrapper.controlScheduler(true); + Object task1 = tsWrapper.allocateTask(hostsH1, priority1); + Object task2 = tsWrapper.allocateTask(hostsH1, priority1); + Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity. + + tsWrapper.awaitLocalTaskAllocations(2); + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + reset(tsWrapper.mockAppCallback); + + // Mark a task as failed due to a comm failure. + tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR); + + // Node1 has free capacity but is disabled. Node 2 has capcaity. Delay > re-enable tiemout + tsWrapper.ensureNoChangeInTotalAllocations(2, 2000l); + } finally { + tsWrapper.shutdown(); + } + } + private static class TestTaskSchedulerServiceWrapper { static final Resource resource = Resource.newInstance(1024, 1); Configuration conf; TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class); - ControlledClock clock = new ControlledClock(new SystemClock()); + ControlledClock clock = new ControlledClock(new MonotonicClock()); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1); LlapTaskSchedulerServiceForTest ts; @@ -555,14 +1004,21 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l); } - TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws + TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors, + int waitQueueSize, long localityDelayMs) throws + IOException, InterruptedException { + this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, false); + } + + TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors, + int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws IOException, InterruptedException { conf = new Configuration(); conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts); conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors); conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize); conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname, - disableTimeoutMillis + "ms"); + nodeDisableTimeoutMillis + "ms"); conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs); @@ -571,7 +1027,11 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); doReturn(userPayload).when(mockAppCallback).getInitialUserPayload(); - ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); + if (controlledDelayedTaskQueue) { + ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock); + } else { + ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); + } controlScheduler(true); ts.initialize(); @@ -582,6 +1042,10 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept awaitSchedulerRun(); } + ControlledClock getClock() { + return clock; + } + void controlScheduler(boolean val) { ts.forTestsetControlScheduling(val); } @@ -591,8 +1055,19 @@ void signalSchedulerRun() throws InterruptedException { } void awaitSchedulerRun() throws InterruptedException { - ts.forTestAwaitSchedulingRun(); + ts.forTestAwaitSchedulingRun(-1); } + + /** + * + * @param timeoutMs + * @return false if the time elapsed + * @throws InterruptedException + */ + boolean awaitSchedulerRun(long timeoutMs) throws InterruptedException { + return ts.forTestAwaitSchedulingRun(timeoutMs); + } + void resetAppCallback() { reset(mockAppCallback); } @@ -605,6 +1080,8 @@ void allocateTask(Object task, String[] hosts, Priority priority, Object clientC ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie); } + + void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) { ts.deallocateTask(task, succeeded, endReason, null); } @@ -612,6 +1089,60 @@ void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReas void rejectExecution(Object task) { ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null); } + + + // More complex methods which may wrap multiple operations + Object allocateTask(String[] hosts, Priority priority) { + Object task = new Object(); + Object clientCookie = new Object(); + allocateTask(task, hosts, priority, clientCookie); + return task; + } + + public void awaitTotalTaskAllocations(int numTasks) throws InterruptedException { + while (true) { + signalSchedulerRun(); + awaitSchedulerRun(); + if (ts.dagStats.numTotalAllocations == numTasks) { + break; + } + } + } + + public void awaitLocalTaskAllocations(int numTasks) throws InterruptedException { + while (true) { + signalSchedulerRun(); + awaitSchedulerRun(); + if (ts.dagStats.numLocalAllocations == numTasks) { + break; + } + } + } + + public void awaitChangeInTotalAllocations(int previousAllocations) throws InterruptedException { + while (true) { + signalSchedulerRun(); + awaitSchedulerRun(); + if (ts.dagStats.numTotalAllocations > previousAllocations) { + break; + } + Thread.sleep(200l); + } + } + + public void ensureNoChangeInTotalAllocations(int previousAllocations, long timeout) throws + InterruptedException { + long startTime = Time.monotonicNow(); + long timeLeft = timeout; + while (timeLeft > 0) { + signalSchedulerRun(); + awaitSchedulerRun(Math.min(200, timeLeft)); + if (ts.dagStats.numTotalAllocations != previousAllocations) { + throw new IllegalStateException("NumTotalAllocations expected to stay at " + previousAllocations + ". Actual=" + ts.dagStats.numTotalAllocations); + } + timeLeft = (startTime + timeout) - Time.monotonicNow(); + } + } } private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService { @@ -624,14 +1155,16 @@ void rejectExecution(Object task) { private boolean schedulingTriggered = false; private final AtomicInteger numSchedulerRuns = new AtomicInteger(0); - public LlapTaskSchedulerServiceForTest( TaskSchedulerContext appClient, Clock clock) { super(appClient, clock); } + + @Override protected void schedulePendingTasks() { + LOG.info("Attempted schedulPendingTasks"); testLock.lock(); try { if (controlScheduling.get()) { @@ -668,17 +1201,143 @@ void forTestSignalSchedulingRun() throws InterruptedException { } } - void forTestAwaitSchedulingRun() throws InterruptedException { + boolean forTestAwaitSchedulingRun(long timeout) throws InterruptedException { testLock.lock(); try { + boolean success = true; while (!schedulingComplete) { - schedulingCompleteCondition.await(); + if (timeout == -1) { + schedulingCompleteCondition.await(); + } else { + success = schedulingCompleteCondition.await(timeout, TimeUnit.MILLISECONDS); + break; + } } schedulingComplete = false; + return success; } finally { testLock.unlock(); } } } + + private static class LlapTaskSchedulerServiceForTestControlled extends LlapTaskSchedulerServiceForTest { + + private DelayedTaskSchedulerCallableControlled controlledTSCallable; + + public LlapTaskSchedulerServiceForTestControlled( + TaskSchedulerContext appClient, Clock clock) { + super(appClient, clock); + } + + @Override + LlapTaskSchedulerService.DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() { + controlledTSCallable = new DelayedTaskSchedulerCallableControlled(); + return controlledTSCallable; + } + + class DelayedTaskSchedulerCallableControlled extends DelayedTaskSchedulerCallable { + private final ReentrantLock lock = new ReentrantLock(); + private final Condition triggerRunCondition = lock.newCondition(); + private boolean shouldRun = false; + private final Condition runCompleteCondition = lock.newCondition(); + private boolean runComplete = false; + + static final int STATE_NOT_RUN = 0; + static final int STATE_NULL_FOUND = 1; + static final int STATE_TIMEOUT_NOT_EXPIRED = 2; + static final int STATE_RETURNED_TASK = 3; + + volatile int lastState = STATE_NOT_RUN; + + volatile boolean lastShouldScheduleTaskResult = false; + volatile boolean shouldScheduleTaskTriggered = false; + + @Override + public void processEvictedTask(TaskInfo taskInfo) { + super.processEvictedTask(taskInfo); + signalRunComplete(); + } + + @Override + public TaskInfo getNextTask() throws InterruptedException { + + while (true) { + lock.lock(); + try { + while (!shouldRun) { + triggerRunCondition.await(); + } + // Preven subsequent runs until a new trigger is set. + shouldRun = false; + } finally { + lock.unlock(); + } + TaskInfo taskInfo = delayedTaskQueue.peek(); + if (taskInfo == null) { + LOG.info("Triggered getTask but the queue is empty"); + lastState = STATE_NULL_FOUND; + signalRunComplete(); + continue; + } + if (taskInfo.shouldDelayForLocality( + LlapTaskSchedulerServiceForTestControlled.this.clock.getTime())) { + LOG.info("Triggered getTask but the first element is not ready to execute"); + lastState = STATE_TIMEOUT_NOT_EXPIRED; + signalRunComplete(); + continue; + } else { + delayedTaskQueue.poll(); // Remove the previously peeked element. + lastState = STATE_RETURNED_TASK; + return taskInfo; + } + } + } + + @Override + public boolean shouldScheduleTask(TaskInfo taskInfo) { + shouldScheduleTaskTriggered = true; + lastShouldScheduleTaskResult = super.shouldScheduleTask(taskInfo); + return lastShouldScheduleTaskResult; + } + + void resetShouldScheduleInformation() { + shouldScheduleTaskTriggered = false; + lastShouldScheduleTaskResult = false; + } + + private void signalRunComplete() { + lock.lock(); + try { + runComplete = true; + runCompleteCondition.signal(); + } finally { + lock.unlock(); + } + } + + void triggerGetNextTask() { + lock.lock(); + try { + shouldRun = true; + triggerRunCondition.signal(); + } finally { + lock.unlock(); + } + } + + void awaitGetNextTaskProcessing() throws InterruptedException { + lock.lock(); + try { + while (!runComplete) { + runCompleteCondition.await(); + } + runComplete = false; + } finally { + lock.unlock(); + } + } + } + } }