diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 95a036d..c6e3975 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2483,6 +2483,13 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" + "this should be picked up from the Registry. -1 indicates unlimited capacity; positive\n" + "values indicate a specific bound.", "llap.task.scheduler.num.schedulable.tasks.per.node"), + LLAP_TASK_SCHEDULER_LOCALITY_DELAY_MS( + "hive.llap.task.scheduler.locality.delay", "0ms", + new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true), + "Amount of time to wait before allocating a request which contains location information," + + " to a location other than the ones requested. Set to -1 for an infinite delay, 0" + + "for a no delay. Currently these are the only two supported values" + ), LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size", 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"), LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME( diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 9821117..27e7365 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -47,12 +47,13 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; @@ -111,6 +112,8 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting final DelayQueue disabledNodesQueue = new DelayQueue<>(); + private final boolean forceLocation; + private final ContainerFactory containerFactory; private final Random random = new Random(); private final Clock clock; @@ -130,7 +133,10 @@ public int compare(Priority o1, Priority o2) { private final SchedulerCallable schedulerCallable = new SchedulerCallable(); private final AtomicBoolean isStopped = new AtomicBoolean(false); + // Tracks total pending preemptions. private final AtomicInteger pendingPreemptions = new AtomicInteger(0); + // Tracks pending preemptions per host, using the hostname || Always to be accessed inside a lock + private final Map pendingPreemptionsPerHost = new HashMap<>(); private final NodeBlacklistConf nodeBlacklistConf; @@ -185,6 +191,15 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock this.numSchedulableTasksPerNode = HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE); + long localityDelayMs = HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY_MS, TimeUnit.MILLISECONDS); + if (localityDelayMs == -1) { + // KKK Maybe add a log line about how this can cause things to fail. + // Needs a check on whether a host even exists - before attempting to force location, otherwise it is guaranteed to fail. + this.forceLocation = true; + } else { + this.forceLocation = false; + } + int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); @@ -206,7 +221,8 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor - + ", nodeBlacklistConf=" + nodeBlacklistConf); + + ", nodeBlacklistConf=" + nodeBlacklistConf + + ", forceLocation=" + forceLocation); } @Override @@ -431,7 +447,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Re-enable the node if preempted if (taskInfo.preempted) { LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason); - pendingPreemptions.decrementAndGet(); + unregisterPendingPreemption(taskInfo.assignedInstance.getHost()); nodeInfo.registerUnsuccessfulTaskEnd(true); if (nodeInfo.isDisabled()) { // Re-enable the node. If a task succeeded, a slot may have become available. @@ -514,7 +530,7 @@ private ExecutorService createAppCallbackExecutorService() { * @param request the list of preferred hosts. null implies any host * @return */ - private NodeServiceInstancePair selectHost(TaskInfo request) { + private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { @@ -528,26 +544,45 @@ private NodeServiceInstancePair selectHost(TaskInfo request) { // If there's no memory available, fail if (getTotalResources().getMemory() <= 0) { - return null; + return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; } - if (requestedHosts != null) { + if (requestedHosts != null && requestedHosts.length > 0) { int prefHostCount = -1; + boolean requestedHostExists = false; for (String host : requestedHosts) { prefHostCount++; // Pick the first host always. Weak attempt at cache affinity. Set instances = activeInstances.getByHost(host); if (!instances.isEmpty()) { + requestedHostExists = true; for (ServiceInstance inst : instances) { NodeInfo nodeInfo = instanceToNodeMap.get(inst); if (nodeInfo != null && nodeInfo.canAcceptTask()) { LOG.info("Assigning " + inst + " when looking for " + host + "." + - " FirstRequestedHost=" + (prefHostCount == 0)); - return new NodeServiceInstancePair(inst, nodeInfo); + " FirstRequestedHost=" + (prefHostCount == 0) + + (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : "")); + return new SelectHostResult(inst, nodeInfo); } } } } + // Check if forcing the location is required. + if (forceLocation) { + if (requestedHostExists) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping non-local location allocation for [" + request.task + + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]"); + } + return SELECT_HOST_RESULT_DELAYED_LOCALITY; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not skipping non-local location allocation for [" + request.task + + "] when trying to allocate on [" + Arrays.toString(requestedHosts) + + "] since none of these hosts are part of the known list"); + } + } + } } /* fall through - miss in locality (random scheduling) */ Entry[] all = @@ -559,12 +594,15 @@ private NodeServiceInstancePair selectHost(TaskInfo request) { for (int i = 0; i < all.length; i++) { Entry inst = all[(i + n) % all.length]; if (inst.getValue().canAcceptTask()) { - LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length); - return new NodeServiceInstancePair(inst.getKey(), inst.getValue()); + LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length + + ", requestedHosts=" + + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : + Arrays.toString(requestedHosts))); + return new SelectHostResult(inst.getKey(), inst.getValue()); } } } - return null; + return SELECT_HOST_RESULT_DELAYED_RESOURCES; } finally { readLock.unlock(); } @@ -716,6 +754,20 @@ private TaskInfo unregisterTask(Object task) { } } + private enum ScheduleResult { + // Successfully scheduled + SCHEDULED, + + // Delayed to find a local match + DELAYED_LOCALITY, + + // Delayed due to temporary resource availability + DELAYED_RESOURCES, + + // Inadequate total resources - will never succeed / wait for new executors to become available + INADEQUATE_TOTAL_RESOURCES, + } + @VisibleForTesting protected void schedulePendingTasks() { writeLock.lock(); @@ -737,22 +789,60 @@ protected void schedulePendingTasks() { dagStats.registerDelayedAllocation(); } taskInfo.triedAssigningTask(); - boolean scheduled = scheduleTask(taskInfo); - if (scheduled) { + ScheduleResult scheduleResult = scheduleTask(taskInfo); + if (scheduleResult == ScheduleResult.SCHEDULED) { taskIter.remove(); } else { + // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by throwin an error immediately, + // or waiting for some timeout for new executors and then throwing an error + // Try pre-empting a task so that a higher priority task can take it's place. - // Preempt only if there's not pending preemptions to avoid preempting twice for a task. - LOG.info("Attempting to preempt for {}, pendingPreemptions={}", taskInfo.task, pendingPreemptions.get()); - if (pendingPreemptions.get() == 0) { - preemptTasks(entry.getKey().getPriority(), 1); + // Preempt only if there's no pending preemptions to avoid preempting twice for a task. + String[] potentialHosts; + if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) { + // preempt only on specific hosts, if no preemptions already exist on those. + potentialHosts = taskInfo.requestedHosts; + //Protect against a bad location being requested. + if (potentialHosts == null || potentialHosts.length == 0) { + potentialHosts = null; + } + } else { + // preempt on any host. + potentialHosts = null; } + if (potentialHosts != null) { + // Preempt on specific host + boolean shouldPreempt = true; + for (String host : potentialHosts) { + // Preempt only if there are not pending preemptions on the same host + // When the premption registers, the request at the highest priority will be given the slot, + // even if the initial request was for some other task. + // TODO Maybe register which task the preemption was for, to avoid a bad non-local allocation. + if (pendingPreemptionsPerHost.get(host).toInteger() > 0) { + shouldPreempt = false; + break; + } + } + if (shouldPreempt) { + LOG.info("Attempting to preempt for {}, pendingPreemptions={} on hosts={}", + taskInfo.task, pendingPreemptions.get(), Arrays.toString(potentialHosts)); + preemptTasks(entry.getKey().getPriority(), 1, potentialHosts); + } + } else { + // Request for a preemption if there's none pending. If a single preemption is pending, + // and this is the next task to be assigned, it will be assigned once that slot becomes available. + if (pendingPreemptions.get() == 0) { + LOG.info("Attempting to preempt for {}, pendingPreemptions={} on any host", + taskInfo.task, pendingPreemptions.get()); + preemptTasks(entry.getKey().getPriority(), 1, null); + } + } + // Since there was an allocation failure - don't try assigning tasks at the next priority. scheduledAllAtPriority = false; - // Don't try assigning tasks at the next priority. break; - } - } + } // end of else - i.e. could not allocate + } // end of loop over pending tasks if (taskListAtPriority.isEmpty()) { // Remove the entry, if there's nothing left at the specific priority level pendingIterator.remove(); @@ -768,11 +858,10 @@ protected void schedulePendingTasks() { } - private boolean scheduleTask(TaskInfo taskInfo) { - NodeServiceInstancePair nsPair = selectHost(taskInfo); - if (nsPair == null) { - return false; - } else { + private ScheduleResult scheduleTask(TaskInfo taskInfo) { + SelectHostResult selectHostResult = selectHost(taskInfo); + if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) { + NodeServiceInstancePair nsPair = selectHostResult.nodeServiceInstancePair; Container container = containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, nsPair.getServiceInstance().getHost(), @@ -788,16 +877,21 @@ private boolean scheduleTask(TaskInfo taskInfo) { } finally { writeLock.unlock(); } - getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container); - return true; } + return selectHostResult.scheduleResult; } // Removes tasks from the runningList and sends out a preempt request to the system. // Subsequent tasks will be scheduled again once the de-allocate request for the preempted // task is processed. - private void preemptTasks(int forPriority, int numTasksToPreempt) { + private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts) { + Set preemptHosts; + if (potentialHosts == null) { + preemptHosts = null; + } else { + preemptHosts = Sets.newHashSet(potentialHosts); + } writeLock.lock(); List preemptedTaskList = null; try { @@ -810,17 +904,21 @@ private void preemptTasks(int forPriority, int numTasksToPreempt) { Iterator taskInfoIterator = entryAtPriority.getValue().iterator(); while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { TaskInfo taskInfo = taskInfoIterator.next(); - preemptedCount++; - LOG.info("preempting {} for task at priority {}", taskInfo, forPriority); - taskInfo.setPreemptedInfo(clock.getTime()); - if (preemptedTaskList == null) { - preemptedTaskList = new LinkedList<>(); + if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedInstance.getHost())) { + // Candidate for preemption. + preemptedCount++; + LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo, + forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts)); + taskInfo.setPreemptedInfo(clock.getTime()); + if (preemptedTaskList == null) { + preemptedTaskList = new LinkedList<>(); + } + dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); + preemptedTaskList.add(taskInfo); + registerPendingPreemption(taskInfo.assignedInstance.getHost()); + // Remove from the runningTaskList + taskInfoIterator.remove(); } - dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost()); - preemptedTaskList.add(taskInfo); - pendingPreemptions.incrementAndGet(); - // Remove from the runningTaskList - taskInfoIterator.remove(); } // Remove entire priority level if it's been emptied. @@ -841,12 +939,43 @@ private void preemptTasks(int forPriority, int numTasksToPreempt) { for (TaskInfo taskInfo : preemptedTaskList) { LOG.info("DBG: Preempting task {}", taskInfo); getContext().preemptContainer(taskInfo.containerId); + // Preemption will finally be registered as a deallocateTask as a result of preemptContainer + // That resets preemption info and allows additional tasks to be pre-empted if required. } } // The schedule loop will be triggered again when the deallocateTask request comes in for the // preempted task. } + private void registerPendingPreemption(String host) { + writeLock.lock(); + try { + pendingPreemptions.incrementAndGet(); + MutableInt val = pendingPreemptionsPerHost.get(host); + if (val == null) { + val = new MutableInt(1); + pendingPreemptionsPerHost.put(host, val); + } + val.increment(); + } finally { + writeLock.unlock(); + } + } + + private void unregisterPendingPreemption(String host) { + writeLock.lock(); + try { + pendingPreemptions.decrementAndGet(); + MutableInt val = pendingPreemptionsPerHost.get(host); + Preconditions.checkNotNull(val); + val.decrement(); + // Not bothering with removing the entry. There's a limited number of hosts, and a good + // chance that the entry will make it back in when the AM is used for a long duration. + } finally { + writeLock.unlock(); + } + } + private class NodeEnablerCallable implements Callable { private AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -1316,6 +1445,28 @@ public int compare(TaskInfo o1, TaskInfo o2) { } } + private static class SelectHostResult { + final NodeServiceInstancePair nodeServiceInstancePair; + final ScheduleResult scheduleResult; + + SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) { + this.nodeServiceInstancePair = new NodeServiceInstancePair(serviceInstance, nodeInfo); + this.scheduleResult = ScheduleResult.SCHEDULED; + } + + SelectHostResult(ScheduleResult scheduleResult) { + this.nodeServiceInstancePair = null; + this.scheduleResult = scheduleResult; + } + } + + private static final SelectHostResult SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY = + new SelectHostResult(ScheduleResult.INADEQUATE_TOTAL_RESOURCES); + private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_LOCALITY = + new SelectHostResult(ScheduleResult.DELAYED_LOCALITY); + private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES = + new SelectHostResult(ScheduleResult.DELAYED_RESOURCES); + private static class NodeServiceInstancePair { final NodeInfo nodeInfo; final ServiceInstance serviceInstance;