diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 49ab59a..16ddd10 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -31,11 +31,16 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.security.auth.login.AppConfigurationEntry; +import com.google.common.base.Objects; +import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -121,6 +126,9 @@ private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; private Set stateChangeListeners; + private final Map> pathToInstanceCache; + private final Map> nodeToInstanceCache; + private final Lock instanceCacheLock = new ReentrantLock(); // get local hostname private static final String hostname; @@ -157,6 +165,8 @@ public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { this.instancesCache = null; this.instances = null; this.stateChangeListeners = new HashSet<>(); + this.pathToInstanceCache = new ConcurrentHashMap<>(); + this.nodeToInstanceCache = new ConcurrentHashMap<>(); final boolean isSecure = UserGroupInformation.isSecurityEnabled(); ACLProvider zooKeeperAclProvider = new ACLProvider() { @@ -406,6 +416,7 @@ public void unregister() throws IOException { private final int shufflePort; private final int outputFormatPort; private final String serviceAddress; + private final Resource resource; public DynamicServiceInstance(ServiceRecord srv) throws IOException { this.srv = srv; @@ -437,6 +448,27 @@ public DynamicServiceInstance(ServiceRecord srv) throws IOException { AddressTypes.ADDRESS_PORT_FIELD)); this.serviceAddress = RegistryTypeUtils.getAddressField(services.addresses.get(0), AddressTypes.ADDRESS_URI); + int memory = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname)); + int vCores = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + this.resource = Resource.newInstance(memory, vCores); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DynamicServiceInstance other = (DynamicServiceInstance) o; + return (this.getWorkerIdentity().equals(other.getWorkerIdentity())); + } + + @Override + public int hashCode() { + return Objects.hashCode(srv); } @Override @@ -471,9 +503,7 @@ public String getServicesAddress() { @Override public Resource getResource() { - int memory = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname)); - int vCores = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); - return Resource.newInstance(memory, vCores); + return resource; } @Override @@ -497,51 +527,91 @@ public int getOutputFormatPort() { // A new ServiceInstance is created each time. } + private void addToCache(String path, String host, ServiceInstance instance) { + instanceCacheLock.lock(); + try { + putInCache(path, pathToInstanceCache, instance); + putInCache(host, nodeToInstanceCache, instance); + } finally { + instanceCacheLock.unlock(); + } + LOG.debug("Added path={}, host={} instance={} to cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size()); + } + + private void removeFromCache(String path, String host) { + instanceCacheLock.lock(); + try { + pathToInstanceCache.remove(path); + nodeToInstanceCache.remove(host); + } finally { + instanceCacheLock.unlock(); + } + LOG.debug("Removed path={}, host={} from cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); + } + + private void putInCache(String key, Map> cache, + ServiceInstance instance) { + Set instanceSet = cache.get(key); + if (instanceSet == null) { + instanceSet = Sets.newHashSet(); + cache.put(key, instanceSet); + } + instanceSet.add(instance); + } + + private class DynamicServiceInstanceSet implements ServiceInstanceSet { private final PathChildrenCache instancesCache; public DynamicServiceInstanceSet(final PathChildrenCache cache) { this.instancesCache = cache; + populateCache(); } - @Override - public Collection getAll() { - List instances = new ArrayList<>(); - // TODO: we could refresh instanceCache here on previous failure + private void populateCache() { for (ChildData childData : instancesCache.getCurrentData()) { if (childData == null) continue; byte[] data = childData.getData(); if (data == null) continue; - if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue; + String nodeName = extractNodeName(childData); + if (!nodeName.startsWith(WORKER_PREFIX)) continue; try { ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); ServiceInstance instance = new DynamicServiceInstance(srv); - instances.add(instance); + addToCache(childData.getPath(), nodeName, instance); } catch (IOException e) { LOG.error("Unable to decode data for zkpath: {}." + " Ignoring from current instances list..", childData.getPath()); } } + } + + @Override + public Collection getAll() { + Set instances = new HashSet<>(); + for(Set instanceSet : pathToInstanceCache.values()) { + instances.addAll(instanceSet); + } return instances; } @Override public Collection getAllInstancesOrdered(boolean consistentIndexes) { Map slotByWorker = new HashMap(); - List unsorted = new LinkedList(); + Set unsorted = Sets.newHashSet(); for (ChildData childData : instancesCache.getCurrentData()) { if (childData == null) continue; byte[] data = childData.getData(); if (data == null) continue; String nodeName = extractNodeName(childData); if (nodeName.startsWith(WORKER_PREFIX)) { - try { - ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); - ServiceInstance instance = new DynamicServiceInstance(srv); - unsorted.add(instance); - } catch (IOException e) { - LOG.error("Unable to decode data for zkpath: {}." + - " Ignoring from current instances list..", childData.getPath()); + Set instances = pathToInstanceCache.get(childData.getPath()); + if (instances != null) { + unsorted.addAll(instances); } } else if (nodeName.startsWith(SLOT_PREFIX)) { slotByWorker.put(extractWorkerIdFromSlot(childData), @@ -599,27 +669,8 @@ public ServiceInstance getInstance(String name) { @Override public Set getByHost(String host) { - Set byHost = new HashSet<>(); - for (ChildData childData : instancesCache.getCurrentData()) { - if (childData == null) continue; - byte[] data = childData.getData(); - if (data == null) continue; - if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue; - try { - ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); - ServiceInstance instance = new DynamicServiceInstance(srv); - if (host.equals(instance.getHost())) { - byHost.add(instance); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Locality comparing " + host + " to " + instance.getHost()); - } - } catch (IOException e) { - LOG.error("Unable to decode data for zkpath: {}." + - " Ignoring host from current instances list..", childData.getPath()); - } - } - + Set byHost = nodeToInstanceCache.get(host); + byHost = (byHost == null) ? Sets.newHashSet() : byHost; if (LOG.isDebugEnabled()) { LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); } @@ -643,27 +694,35 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); synchronized (this) { - if (stateChangeListeners.isEmpty()) return; ChildData childData = event.getData(); - if (childData == null) return; + if (childData == null) + return; String nodeName = extractNodeName(childData); - if (!nodeName.startsWith(WORKER_PREFIX)) return; // No need to propagate slot updates. + if (!nodeName.startsWith(WORKER_PREFIX)) + return; // No need to propagate slot updates. LOG.info("{} for zknode {} in llap namespace", event.getType(), childData.getPath()); ServiceInstance instance = extractServiceInstance(event, childData); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - switch (event.getType()) { - case CHILD_ADDED: + switch (event.getType()) { + case CHILD_ADDED: + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { listener.onCreate(instance); - break; - case CHILD_UPDATED: + } + addToCache(childData.getPath(), nodeName, instance); + break; + case CHILD_UPDATED: + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { listener.onUpdate(instance); - break; - case CHILD_REMOVED: + } + addToCache(childData.getPath(), nodeName, instance); + break; + case CHILD_REMOVED: + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { listener.onRemove(instance); - break; - default: - // Ignore all the other events; logged above. } + removeFromCache(childData.getPath(), nodeName); + break; + default: + // Ignore all the other events; logged above. } } } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index ff7140d..43d4d1f 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -701,20 +701,17 @@ public boolean hasUnregistered() { */ private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; + String requestedHostsDebugStr = Arrays.toString(requestedHosts); if (LOG.isDebugEnabled()) { - LOG.debug("selectingHost for task={} on hosts={}", request.task, Arrays.toString(requestedHosts)); + LOG.debug("selectingHost for task={} on hosts={}", request.task, + requestedHostsDebugStr); } long schedulerAttemptTime = clock.getTime(); readLock.lock(); // Read-lock. Not updating any stats at the moment. try { - // If there's no memory available, fail - if (getTotalResources().getMemory() <= 0) { - return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; - } - boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime); LOG.debug("ShouldDelayForLocality={} for task={} on hosts={}", shouldDelayForLocality, - request.task, Arrays.toString(requestedHosts)); + request.task, requestedHostsDebugStr); if (requestedHosts != null && requestedHosts.length > 0) { int prefHostCount = -1; boolean requestedHostsWillBecomeAvailable = false; @@ -729,8 +726,9 @@ private SelectHostResult selectHost(TaskInfo request) { if (nodeInfo.canAcceptTask()) { // Successfully scheduled. LOG.info( - "Assigning " + nodeInfo.toShortString() + " when looking for " + host + - ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) + + "Assigning {} when looking for {}." + + " local=true FirstRequestedHost={}, #prefLocations={}", nodeInfo + .toShortString(), host, (prefHostCount == 0) + (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length : "")); return new SelectHostResult(nodeInfo); @@ -767,7 +765,7 @@ private SelectHostResult selectHost(TaskInfo request) { if (requestedHostsWillBecomeAvailable) { if (LOG.isDebugEnabled()) { LOG.debug("Delaying local allocation for [" + request.task + - "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]" + + "] when trying to allocate on [" + requestedHostsDebugStr + "]" + ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" + request.getLocalityDelayTimeout()); } @@ -775,7 +773,7 @@ private SelectHostResult selectHost(TaskInfo request) { } else { if (LOG.isDebugEnabled()) { LOG.debug("Skipping local allocation for [" + request.task + - "] when trying to allocate on [" + Arrays.toString(requestedHosts) + + "] when trying to allocate on [" + requestedHostsDebugStr + "] since none of these hosts are part of the known list"); } } @@ -799,10 +797,9 @@ private SelectHostResult selectHost(TaskInfo request) { } for (NodeInfo nodeInfo : allNodes) { if (nodeInfo.canAcceptTask()) { - LOG.info("Assigning " + nodeInfo.toShortString() - + " when looking for any host, from #hosts=" + allNodes.size() + ", requestedHosts=" - + ((requestedHosts == null || requestedHosts.length == 0) - ? "null" : Arrays.toString(requestedHosts))); + LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}", + nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == null || requestedHosts.length == 0) + ? "null" : requestedHostsDebugStr)); return new SelectHostResult(nodeInfo); } } @@ -825,10 +822,10 @@ private SelectHostResult selectHost(TaskInfo request) { for (int i = 0; i < allNodes.size(); i++) { NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size()); if (nodeInfo.canAcceptTask()) { - LOG.info("Assigning " + nodeInfo.toShortString() - + " when looking for first requested host, from #hosts=" + allNodes.size() + ", requestedHosts=" - + ((requestedHosts == null || requestedHosts.length == 0) - ? "null" : Arrays.toString(requestedHosts))); + if (LOG.isInfoEnabled()) + LOG.info("Assigning {} when looking for first requested host, from #hosts={}," + + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(), + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : requestedHostsDebugStr)); return new SelectHostResult(nodeInfo); } } @@ -1036,6 +1033,7 @@ protected void schedulePendingTasks() { } Iterator>> pendingIterator = pendingTasks.entrySet().iterator(); + Resource totalResource = getTotalResources(); while (pendingIterator.hasNext()) { Entry> entry = pendingIterator.next(); List taskListAtPriority = entry.getValue(); @@ -1050,7 +1048,7 @@ protected void schedulePendingTasks() { dagStats.registerDelayedAllocation(); } taskInfo.triedAssigningTask(); - ScheduleResult scheduleResult = scheduleTask(taskInfo); + ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource); LOG.info("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult); if (scheduleResult == ScheduleResult.SCHEDULED) { taskIter.remove(); @@ -1105,13 +1103,16 @@ protected void schedulePendingTasks() { } } if (shouldPreempt) { - LOG.debug("Preempting for {} on potential hosts={}. TotalPendingPreemptions={}", - taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get()); + if (LOG.isDebugEnabled()) { + LOG.debug("Preempting for {} on potential hosts={}. TotalPendingPreemptions={}", + taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get()); + } preemptTasks(entry.getKey().getPriority(), 1, potentialHosts); } else { - LOG.debug( - "Not preempting for {} on potential hosts={}. An existing preemption request exists", - taskInfo.task, Arrays.toString(potentialHosts)); + if (LOG.isDebugEnabled()) { + LOG.debug("Not preempting for {} on potential hosts={}. An existing preemption request exists", + taskInfo.task, Arrays.toString(potentialHosts)); + } } } else { // Either DELAYED_RESOURCES or DELAYED_LOCALITY with an unknown requested host. // Request for a preemption if there's none pending. If a single preemption is pending, @@ -1172,7 +1173,12 @@ private String constructPendingTaskCountsLogMessage() { return sb.toString(); } - private ScheduleResult scheduleTask(TaskInfo taskInfo) { + private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource) { + Preconditions.checkNotNull(totalResource, "totalResource can not be null"); + // If there's no memory available, fail + if (totalResource.getMemory() <= 0) { + return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY.scheduleResult; + } SelectHostResult selectHostResult = selectHost(taskInfo); if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) { NodeInfo nodeInfo = selectHostResult.nodeInfo; @@ -1202,12 +1208,7 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo) { // Subsequent tasks will be scheduled again once the de-allocate request for the preempted // task is processed. private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts) { - Set preemptHosts; - if (potentialHosts == null) { - preemptHosts = null; - } else { - preemptHosts = Sets.newHashSet(potentialHosts); - } + Set preemptHosts = null; writeLock.lock(); List preemptedTaskList = null; try { @@ -1217,6 +1218,9 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten while (iterator.hasNext() && preemptedCount < numTasksToPreempt) { Entry> entryAtPriority = iterator.next(); if (entryAtPriority.getKey() > forPriority) { + if (potentialHosts != null && preemptHosts == null) { + preemptHosts = Sets.newHashSet(potentialHosts); + } Iterator taskInfoIterator = entryAtPriority.getValue().iterator(); while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { TaskInfo taskInfo = taskInfoIterator.next();