diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 95a036d..13b603b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2483,6 +2483,13 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" + "this should be picked up from the Registry. -1 indicates unlimited capacity; positive\n" + "values indicate a specific bound.", "llap.task.scheduler.num.schedulable.tasks.per.node"), + LLAP_TASK_SCHEDULER_LOCALITY_DELAY( + "hive.llap.task.scheduler.locality.delay", "0ms", + new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true), + "Amount of time to wait before allocating a request which contains location information," + + " to a location other than the ones requested. Set to -1 for an infinite delay, 0" + + "for a no delay. Currently these are the only two supported values" + ), LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size", 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"), LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME( diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 9821117..6beb4f8 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -47,12 +47,13 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; @@ -111,6 +112,8 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting final DelayQueue disabledNodesQueue = new DelayQueue<>(); + private final boolean forceLocation; + private final ContainerFactory containerFactory; private final Random random = new Random(); private final Clock clock; @@ -130,7 +133,10 @@ public int compare(Priority o1, Priority o2) { private final SchedulerCallable schedulerCallable = new SchedulerCallable(); private final AtomicBoolean isStopped = new AtomicBoolean(false); + // Tracks total pending preemptions. private final AtomicInteger pendingPreemptions = new AtomicInteger(0); + // Tracks pending preemptions per host, using the hostname || Always to be accessed inside a lock + private final Map pendingPreemptionsPerHost = new HashMap<>(); private final NodeBlacklistConf nodeBlacklistConf; @@ -185,6 +191,14 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock this.numSchedulableTasksPerNode = HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE); + long localityDelayMs = HiveConf + .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS); + if (localityDelayMs == -1) { + this.forceLocation = true; + } else { + this.forceLocation = false; + } + int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); @@ -206,7 +220,8 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor - + ", nodeBlacklistConf=" + nodeBlacklistConf); + + ", nodeBlacklistConf=" + nodeBlacklistConf + + ", forceLocation=" + forceLocation); } @Override @@ -431,7 +446,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Re-enable the node if preempted if (taskInfo.preempted) { LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); - pendingPreemptions.decrementAndGet(); + unregisterPendingPreemption(taskInfo.assignedInstance.getHost()); nodeInfo.registerUnsuccessfulTaskEnd(true); if (nodeInfo.isDisabled()) { // Re-enable the node. If a task succeeded, a slot may have become available. @@ -514,7 +529,7 @@ private ExecutorService createAppCallbackExecutorService() { * @param request the list of preferred hosts. null implies any host * @return */ - private NodeServiceInstancePair selectHost(TaskInfo request) { + private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { @@ -528,26 +543,45 @@ private NodeServiceInstancePair selectHost(TaskInfo request) { // If there's no memory available, fail if (getTotalResources().getMemory() <= 0) { - return null; + return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; } - if (requestedHosts != null) { + if (requestedHosts != null && requestedHosts.length > 0) { int prefHostCount = -1; + boolean requestedHostExists = false; for (String host : requestedHosts) { prefHostCount++; // Pick the first host always. Weak attempt at cache affinity. Set instances = activeInstances.getByHost(host); if (!instances.isEmpty()) { + requestedHostExists = true; for (ServiceInstance inst : instances) { NodeInfo nodeInfo = instanceToNodeMap.get(inst); if (nodeInfo != null && nodeInfo.canAcceptTask()) { LOG.info("Assigning " + inst + " when looking for " + host + "." + - " FirstRequestedHost=" + (prefHostCount == 0)); - return new NodeServiceInstancePair(inst, nodeInfo); + " FirstRequestedHost=" + (prefHostCount == 0) + + (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : "")); + return new SelectHostResult(inst, nodeInfo); } } } } + // Check if forcing the location is required. + if (forceLocation) { + if (requestedHostExists) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping non-local location allocation for [" + request.task + + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]"); + } + return SELECT_HOST_RESULT_DELAYED_LOCALITY; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not skipping non-local location allocation for [" + request.task + + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + + "] since none of these hosts are part of the known list"); + } + } + } } /* fall through - miss in locality (random scheduling) */ Entry[] all = @@ -559,12 +593,15 @@ private NodeServiceInstancePair selectHost(TaskInfo request) { for (int i = 0; i < all.length; i++) { Entry inst = all[(i + n) % all.length]; if (inst.getValue().canAcceptTask()) { - LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length); - return new NodeServiceInstancePair(inst.getKey(), inst.getValue()); + LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length + + ", requestedHosts=" + + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : + Arrays.toString(requestedHosts))); + return new SelectHostResult(inst.getKey(), inst.getValue()); } } } - return null; + return SELECT_HOST_RESULT_DELAYED_RESOURCES; } finally { readLock.unlock(); } @@ -716,6 +753,20 @@ private TaskInfo unregisterTask(Object task) { } } + private enum ScheduleResult { + // Successfully scheduled + SCHEDULED, + + // Delayed to find a local match + DELAYED_LOCALITY, + + // Delayed due to temporary resource availability + DELAYED_RESOURCES, + + // Inadequate total resources - will never succeed / wait for new executors to become available + INADEQUATE_TOTAL_RESOURCES, + } + @VisibleForTesting protected void schedulePendingTasks() { writeLock.lock(); @@ -737,22 +788,65 @@ protected void schedulePendingTasks() { dagStats.registerDelayedAllocation(); } taskInfo.triedAssigningTask(); - boolean scheduled = scheduleTask(taskInfo); - if (scheduled) { + ScheduleResult scheduleResult = scheduleTask(taskInfo); + if (scheduleResult == ScheduleResult.SCHEDULED) { taskIter.remove(); } else { + // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by throwin an error immediately, + // or waiting for some timeout for new executors and then throwing an error + // Try pre-empting a task so that a higher priority task can take it's place. - // Preempt only if there's not pending preemptions to avoid preempting twice for a task. - LOG.info("Attempting to preempt for {}, pendingPreemptions={}", taskInfo.task, pendingPreemptions.get()); - if (pendingPreemptions.get() == 0) { - preemptTasks(entry.getKey().getPriority(), 1); + // Preempt only if there's no pending preemptions to avoid preempting twice for a task. + String[] potentialHosts; + if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) { + // preempt only on specific hosts, if no preemptions already exist on those. + potentialHosts = taskInfo.requestedHosts; + //Protect against a bad location being requested. + if (potentialHosts == null || potentialHosts.length == 0) { + potentialHosts = null; + } + } else { + // preempt on any host. + potentialHosts = null; } + if (potentialHosts != null) { + // Preempt on specific host + boolean shouldPreempt = true; + for (String host : potentialHosts) { + // Preempt only if there are not pending preemptions on the same host + // When the premption registers, the request at the highest priority will be given the slot, + // even if the initial request was for some other task. + // TODO Maybe register which task the preemption was for, to avoid a bad non-local allocation. + MutableInt pendingHostPreemptions = pendingPreemptionsPerHost.get(host); + if (pendingHostPreemptions != null && pendingHostPreemptions.intValue() > 0) { + shouldPreempt = false; + break; + } + } + if (shouldPreempt) { + LOG.info("Attempting to preempt for {}, pendingPreemptions={} on hosts={}", + taskInfo.task, pendingPreemptions.get(), Arrays.toString(potentialHosts)); + preemptTasks(entry.getKey().getPriority(), 1, potentialHosts); + } + } else { + // Request for a preemption if there's none pending. If a single preemption is pending, + // and this is the next task to be assigned, it will be assigned once that slot becomes available. + if (pendingPreemptions.get() == 0) { + LOG.info("Attempting to preempt for {}, pendingPreemptions={} on any host", + taskInfo.task, pendingPreemptions.get()); + preemptTasks(entry.getKey().getPriority(), 1, null); + } + } + // Since there was an allocation failure - don't try assigning tasks at the next priority. + scheduledAllAtPriority = false; - // Don't try assigning tasks at the next priority. - break; - } - } + // Don't break if this allocation failure was a result of a LOCALITY_DELAY. Others could still be allocated. + if (scheduleResult != ScheduleResult.DELAYED_LOCALITY) { + break; + } + } // end of else - i.e. could not allocate + } // end of loop over pending tasks if (taskListAtPriority.isEmpty()) { // Remove the entry, if there's nothing left at the specific priority level pendingIterator.remove(); @@ -768,11 +862,10 @@ protected void schedulePendingTasks() { } - private boolean scheduleTask(TaskInfo taskInfo) { - NodeServiceInstancePair nsPair = selectHost(taskInfo); - if (nsPair == null) { - return false; - } else { + private ScheduleResult scheduleTask(TaskInfo taskInfo) { + SelectHostResult selectHostResult = selectHost(taskInfo); + if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) { + NodeServiceInstancePair nsPair = selectHostResult.nodeServiceInstancePair; Container container = containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, nsPair.getServiceInstance().getHost(), @@ -788,16 +881,21 @@ private boolean scheduleTask(TaskInfo taskInfo) { } finally { writeLock.unlock(); } - getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container); - return true; } + return selectHostResult.scheduleResult; } // Removes tasks from the runningList and sends out a preempt request to the system. // Subsequent tasks will be scheduled again once the de-allocate request for the preempted // task is processed. - private void preemptTasks(int forPriority, int numTasksToPreempt) { + private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts) { + Set preemptHosts; + if (potentialHosts == null) { + preemptHosts = null; + } else { + preemptHosts = Sets.newHashSet(potentialHosts); + } writeLock.lock(); List preemptedTaskList = null; try { @@ -810,17 +908,21 @@ private void preemptTasks(int forPriority, int numTasksToPreempt) { Iterator taskInfoIterator = entryAtPriority.getValue().iterator(); while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { TaskInfo taskInfo = taskInfoIterator.next(); - preemptedCount++; - LOG.info("preempting {} for task at priority {}", taskInfo, forPriority); - taskInfo.setPreemptedInfo(clock.getTime()); - if (preemptedTaskList == null) { - preemptedTaskList = new LinkedList<>(); + if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedInstance.getHost())) { + // Candidate for preemption. + preemptedCount++; + LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo, + forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts)); + taskInfo.setPreemptedInfo(clock.getTime()); + if (preemptedTaskList == null) { + preemptedTaskList = new LinkedList<>(); + } + dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); + preemptedTaskList.add(taskInfo); + registerPendingPreemption(taskInfo.assignedInstance.getHost()); + // Remove from the runningTaskList + taskInfoIterator.remove(); } - dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); - preemptedTaskList.add(taskInfo); - pendingPreemptions.incrementAndGet(); - // Remove from the runningTaskList - taskInfoIterator.remove(); } // Remove entire priority level if it's been emptied. @@ -841,12 +943,43 @@ private void preemptTasks(int forPriority, int numTasksToPreempt) { for (TaskInfo taskInfo : preemptedTaskList) { LOG.info("DBG: Preempting task {}", taskInfo); getContext().preemptContainer(taskInfo.containerId); + // Preemption will finally be registered as a deallocateTask as a result of preemptContainer + // That resets preemption info and allows additional tasks to be pre-empted if required. } } // The schedule loop will be triggered again when the deallocateTask request comes in for the // preempted task. } + private void registerPendingPreemption(String host) { + writeLock.lock(); + try { + pendingPreemptions.incrementAndGet(); + MutableInt val = pendingPreemptionsPerHost.get(host); + if (val == null) { + val = new MutableInt(1); + pendingPreemptionsPerHost.put(host, val); + } + val.increment(); + } finally { + writeLock.unlock(); + } + } + + private void unregisterPendingPreemption(String host) { + writeLock.lock(); + try { + pendingPreemptions.decrementAndGet(); + MutableInt val = pendingPreemptionsPerHost.get(host); + Preconditions.checkNotNull(val); + val.decrement(); + // Not bothering with removing the entry. There's a limited number of hosts, and a good + // chance that the entry will make it back in when the AM is used for a long duration. + } finally { + writeLock.unlock(); + } + } + private class NodeEnablerCallable implements Callable { private AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -1316,6 +1449,28 @@ public int compare(TaskInfo o1, TaskInfo o2) { } } + private static class SelectHostResult { + final NodeServiceInstancePair nodeServiceInstancePair; + final ScheduleResult scheduleResult; + + SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) { + this.nodeServiceInstancePair = new NodeServiceInstancePair(serviceInstance, nodeInfo); + this.scheduleResult = ScheduleResult.SCHEDULED; + } + + SelectHostResult(ScheduleResult scheduleResult) { + this.nodeServiceInstancePair = null; + this.scheduleResult = scheduleResult; + } + } + + private static final SelectHostResult SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY = + new SelectHostResult(ScheduleResult.INADEQUATE_TOTAL_RESOURCES); + private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_LOCALITY = + new SelectHostResult(ScheduleResult.DELAYED_LOCALITY); + private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES = + new SelectHostResult(ScheduleResult.DELAYED_RESOURCES); + private static class NodeServiceInstancePair { final NodeInfo nodeInfo; final ServiceInstance serviceInstance; diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index 4eccc06..4c1cbb3 100644 --- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -20,6 +20,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -108,8 +109,6 @@ public void testSimpleNoLocalityAllocation() throws IOException, InterruptedExce } } - // TODO Add a test to ensure the correct task is being preempted, and the completion for the specific - // task triggers the next task to be scheduled. @Test(timeout=5000) public void testPreemption() throws InterruptedException, IOException { @@ -123,11 +122,11 @@ public void testPreemption() throws InterruptedException, IOException { Object task1 = "task1"; Object clientCookie1 = "cookie1"; Object task2 = "task2"; - Object clientCookie2 = "cookie1"; + Object clientCookie2 = "cookie2"; Object task3 = "task3"; - Object clientCookie3 = "cookie1"; + Object clientCookie3 = "cookie3"; Object task4 = "task4"; - Object clientCookie4 = "cookie1"; + Object clientCookie4 = "cookie4"; tsWrapper.controlScheduler(true); tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); @@ -309,6 +308,231 @@ public void testNodeReEnabled() throws InterruptedException, IOException { } } + @Test (timeout = 5000) + public void testForceLocalityTest1() throws IOException, InterruptedException { + // 2 hosts. 2 per host. 5 requests at the same priority. + // First 3 on host1, Next at host2, Last with no host. + // Third request on host1 should not be allocated immediately. + forceLocalityTest1(true); + + } + + @Test (timeout = 5000) + public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException { + // 2 hosts. 2 per host. 5 requests at the same priority. + // First 3 on host1, Next at host2, Last with no host. + // Third should allocate on host2, 4th on host2, 5th will wait. + + forceLocalityTest1(false); + } + + private void forceLocalityTest1(boolean forceLocality) throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + + String[] hosts = new String[] {HOST1, HOST2}; + + String[] hostsH1 = new String[] {HOST1}; + String[] hostsH2 = new String[] {HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, (forceLocality ? -1l : 0l)); + + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + Object task5 = "task5"; + Object clientCookie5 = "cookie5"; + + tsWrapper.controlScheduler(true); + //H1 - should allocate + tsWrapper.allocateTask(task1, hostsH1, priority1, clientCookie1); + //H1 - should allocate + tsWrapper.allocateTask(task2, hostsH1, priority1, clientCookie2); + //H1 - no capacity if force, should allocate otherwise + tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3); + //H2 - should allocate + tsWrapper.allocateTask(task4, hostsH2, priority1, clientCookie4); + //No location - should allocate if force, no capacity otherwise + tsWrapper.allocateTask(task5, null, priority1, clientCookie5); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 4) { + break; + } + } + + // Verify no preemption requests - since everything is at the same priority + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(4)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(4, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + if (forceLocality) { + // task3 not allocated + assertEquals(task4, argumentCaptor.getAllValues().get(2)); + assertEquals(task5, argumentCaptor.getAllValues().get(3)); + } else { + assertEquals(task3, argumentCaptor.getAllValues().get(2)); + assertEquals(task4, argumentCaptor.getAllValues().get(3)); + } + + //Complete one task on host1. + tsWrapper.deallocateTask(task1, true, null); + + reset(tsWrapper.mockAppCallback); + + // Try scheduling again. + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 5) { + break; + } + } + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(1, argumentCaptor.getAllValues().size()); + if (forceLocality) { + assertEquals(task3, argumentCaptor.getAllValues().get(0)); + } else { + assertEquals(task5, argumentCaptor.getAllValues().get(0)); + } + + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 5000) + public void testForcedLocalityUnknownHost() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + + String[] hostsKnown = new String[]{HOST1}; + String[] hostsUnknown = new String[]{HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l); + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + + tsWrapper.controlScheduler(true); + // Should allocate since H2 is not known. + tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1); + tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2); + + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 2) { + break; + } + } + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + + } finally { + tsWrapper.shutdown(); + } + } + + + @Test(timeout = 5000) + public void testForcedLocalityPreemption() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + String [] hosts = new String[] {HOST1, HOST2}; + + String [] hostsH1 = new String[] {HOST1}; + String [] hostsH2 = new String[] {HOST2}; + + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l); + + // Fill up host1 with p2 tasks. + // Leave host2 empty + // Try running p1 task on host1 - should preempt + + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2); + // This request at a lower priority should not affect anything. + tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { + break; + } + } + + verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class)); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class)); + assertEquals(2, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + + reset(tsWrapper.mockAppCallback); + // Allocate t4 at higher priority. t3 should not be allocated, + // and a preemption should be attempted on host1, despite host2 having available capacity + tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class)); + + tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } + verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4), + eq(clientCookie4), any(Container.class)); + + } finally { + tsWrapper.shutdown(); + } + } + private static class TestTaskSchedulerServiceWrapper { static final Resource resource = Resource.newInstance(1024, 1); Configuration conf; @@ -329,6 +553,11 @@ public void testNodeReEnabled() throws InterruptedException, IOException { TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws IOException, InterruptedException { + this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l); + } + + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws + IOException, InterruptedException { conf = new Configuration(); conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts); conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors); @@ -336,6 +565,7 @@ public void testNodeReEnabled() throws InterruptedException, IOException { conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname, disableTimeoutMillis + "ms"); conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); + conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs); doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId(); doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();