commit a313c165fe8a507242a0ac5ddc0f960e9e50ca84 Author: Wangda Tan Date: Thu Sep 8 15:04:58 2016 -0700 YARN-3141, app lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 97d29cf..b049174 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -26,8 +26,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.collect.ConcurrentHashMultiset; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.FastDateFormat; import org.apache.commons.logging.Log; @@ -71,8 +74,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; /** * Represents an application attempt from the viewpoint of the scheduler. @@ -97,14 +98,14 @@ protected final AppSchedulingInfo appSchedulingInfo; protected ApplicationAttemptId attemptId; protected Map liveContainers = - new HashMap(); + new ConcurrentHashMap<>(); protected final Map> reservedContainers = new HashMap<>(); - private final Multiset reReservations = - HashMultiset.create(); + private final ConcurrentHashMultiset reReservations = + ConcurrentHashMultiset.create(); - private Resource resourceLimit = Resource.newInstance(0, 0); + private volatile Resource resourceLimit = Resource.newInstance(0, 0); private boolean unmanagedAM = true; private boolean amRunning = false; private LogAggregationContext logAggregationContext; @@ -138,8 +139,9 @@ * the application successfully schedules a task (at rack or node local), it * is reset to 0. */ - Multiset schedulingOpportunities = HashMultiset.create(); - + private ConcurrentHashMultiset schedulingOpportunities = + ConcurrentHashMultiset.create(); + /** * Count how many times the application has been given an opportunity to * schedule a non-partitioned resource request at each priority. Each time the @@ -147,15 +149,16 @@ * incremented, and each time the application successfully schedules a task, * it is reset to 0 when schedule any task at corresponding priority. */ - Multiset missedNonPartitionedReqSchedulingOpportunity = - HashMultiset.create(); + private ConcurrentHashMultiset + missedNonPartitionedReqSchedulingOpportunity = + ConcurrentHashMultiset.create(); // Time of the last container scheduled at the current allowed level protected Map lastScheduledContainer = - new HashMap<>(); + new ConcurrentHashMap<>(); - protected Queue queue; - protected boolean isStopped = false; + protected volatile Queue queue; + protected volatile boolean isStopped = false; protected String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL; @@ -163,6 +166,9 @@ private RMAppAttempt appAttempt; + protected ReentrantReadWriteLock.ReadLock readLock; + protected ReentrantReadWriteLock.WriteLock writeLock; + public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { @@ -188,14 +194,23 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, appSubmissionContext.getLogAggregationContext(); } } + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); } /** * Get the live containers of the application. * @return live containers of the application */ - public synchronized Collection getLiveContainers() { - return new ArrayList(liveContainers.values()); + public Collection getLiveContainers() { + try { + readLock.lock(); + return new ArrayList<>(liveContainers.values()); + } finally { + readLock.unlock(); + } } public AppSchedulingInfo getAppSchedulingInfo() { @@ -243,20 +258,36 @@ public long getNewContainerId() { return appSchedulingInfo.getSchedulerKeys(); } - public synchronized ResourceRequest getResourceRequest( + public ResourceRequest getResourceRequest( SchedulerRequestKey schedulerKey, String resourceName) { - return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName); + try { + readLock.lock(); + return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName); + } finally { + readLock.unlock(); + } + } - public synchronized int getTotalRequiredResources( + public int getTotalRequiredResources( SchedulerRequestKey schedulerKey) { - ResourceRequest request = - getResourceRequest(schedulerKey, ResourceRequest.ANY); - return request == null ? 0 : request.getNumContainers(); + try { + readLock.lock(); + ResourceRequest request = + getResourceRequest(schedulerKey, ResourceRequest.ANY); + return request == null ? 0 : request.getNumContainers(); + } finally { + readLock.unlock(); + } } - public synchronized Resource getResource(SchedulerRequestKey schedulerKey) { - return appSchedulingInfo.getResource(schedulerKey); + public Resource getResource(SchedulerRequestKey schedulerKey) { + try { + readLock.lock(); + return appSchedulingInfo.getResource(schedulerKey); + } finally { + readLock.unlock(); + } } public String getQueueName() { @@ -291,38 +322,48 @@ public boolean getUnmanagedAM() { return unmanagedAM; } - public synchronized RMContainer getRMContainer(ContainerId id) { + public RMContainer getRMContainer(ContainerId id) { return liveContainers.get(id); } - public synchronized void addRMContainer( + public void addRMContainer( ContainerId id, RMContainer rmContainer) { - liveContainers.put(id, rmContainer); - if (rmContainer.isRemotelyAllocated()) { - this.attemptResourceUsageAllocatedRemotely.incUsed( - rmContainer.getAllocatedResource()); + try { + writeLock.lock(); + liveContainers.put(id, rmContainer); + if (rmContainer.isRemotelyAllocated()) { + this.attemptResourceUsageAllocatedRemotely.incUsed( + rmContainer.getAllocatedResource()); + } + } finally { + writeLock.unlock(); } } - public synchronized void removeRMContainer(ContainerId containerId) { - RMContainer rmContainer = liveContainers.remove(containerId); - if (rmContainer != null && rmContainer.isRemotelyAllocated()) { - this.attemptResourceUsageAllocatedRemotely.decUsed( - rmContainer.getAllocatedResource()); + public void removeRMContainer(ContainerId containerId) { + try { + writeLock.lock(); + RMContainer rmContainer = liveContainers.remove(containerId); + if (rmContainer != null && rmContainer.isRemotelyAllocated()) { + this.attemptResourceUsageAllocatedRemotely.decUsed( + rmContainer.getAllocatedResource()); + } + } finally { + writeLock.unlock(); } } - protected synchronized void resetReReservations( + protected void resetReReservations( SchedulerRequestKey schedulerKey) { reReservations.setCount(schedulerKey, 0); } - protected synchronized void addReReservation( + protected void addReReservation( SchedulerRequestKey schedulerKey) { reReservations.add(schedulerKey); } - public synchronized int getReReservations(SchedulerRequestKey schedulerKey) { + public int getReReservations(SchedulerRequestKey schedulerKey) { return reReservations.count(schedulerKey); } @@ -333,7 +374,7 @@ public synchronized int getReReservations(SchedulerRequestKey schedulerKey) { */ @Stable @Private - public synchronized Resource getCurrentReservation() { + public Resource getCurrentReservation() { return attemptResourceUsage.getReserved(); } @@ -341,28 +382,43 @@ public Queue getQueue() { return queue; } - public synchronized boolean updateResourceRequests( + public boolean updateResourceRequests( List requests) { - if (!isStopped) { - return appSchedulingInfo.updateResourceRequests(requests, false); + try { + writeLock.lock(); + if (!isStopped) { + return appSchedulingInfo.updateResourceRequests(requests, false); + } + return false; + } finally { + writeLock.unlock(); } - return false; } - public synchronized void recoverResourceRequestsForContainer( + public void recoverResourceRequestsForContainer( List requests) { - if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests, true); + try { + writeLock.lock(); + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests, true); + } + } finally { + writeLock.unlock(); } } - public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { - // Cleanup all scheduling information - isStopped = true; - appSchedulingInfo.stop(); + public void stop(RMAppAttemptState rmAppAttemptFinalState) { + try { + writeLock.lock(); + // Cleanup all scheduling information + isStopped = true; + appSchedulingInfo.stop(); + } finally { + writeLock.unlock(); + } } - public synchronized boolean isStopped() { + public boolean isStopped() { return isStopped; } @@ -370,29 +426,40 @@ public synchronized boolean isStopped() { * Get the list of reserved containers * @return All of the reserved containers. */ - public synchronized List getReservedContainers() { - List reservedContainers = new ArrayList(); - for (Map.Entry> e : - this.reservedContainers.entrySet()) { - reservedContainers.addAll(e.getValue().values()); + public List getReservedContainers() { + try { + readLock.lock(); + List reservedContainers = new ArrayList<>(); + for (Entry> e : + this.reservedContainers.entrySet()) { + reservedContainers.addAll(e.getValue().values()); + } + return reservedContainers; + } finally { + readLock.unlock(); } - return reservedContainers; + } - public synchronized boolean reserveIncreasedContainer(SchedulerNode node, + public boolean reserveIncreasedContainer(SchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer rmContainer, Resource reservedResource) { - if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) { - attemptResourceUsage.incReserved(node.getPartition(), - reservedResource); - // succeeded - return true; + try { + writeLock.lock(); + if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) { + attemptResourceUsage.incReserved(node.getPartition(), reservedResource); + // succeeded + return true; + } + + return false; + } finally { + writeLock.unlock(); } - - return false; + } - private synchronized boolean commonReserve(SchedulerNode node, + private boolean commonReserve(SchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer rmContainer, Resource reservedResource) { try { @@ -423,102 +490,107 @@ private synchronized boolean commonReserve(SchedulerNode node, return true; } - public synchronized RMContainer reserve(SchedulerNode node, + public RMContainer reserve(SchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer rmContainer, Container container) { - // Create RMContainer if necessary - if (rmContainer == null) { - rmContainer = - new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), appSchedulingInfo.getUser(), rmContext); - attemptResourceUsage.incReserved(node.getPartition(), - container.getResource()); - ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); - - // Reset the re-reservation count - resetReReservations(schedulerKey); - } else { - // Note down the re-reservation - addReReservation(schedulerKey); + try { + writeLock.lock(); + // Create RMContainer if necessary + if (rmContainer == null) { + rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + attemptResourceUsage.incReserved(node.getPartition(), + container.getResource()); + ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); + + // Reset the re-reservation count + resetReReservations(schedulerKey); + } else{ + // Note down the re-reservation + addReReservation(schedulerKey); + } + + commonReserve(node, schedulerKey, rmContainer, container.getResource()); + + return rmContainer; + } finally { + writeLock.unlock(); } - - commonReserve(node, schedulerKey, rmContainer, container.getResource()); - return rmContainer; } - /** - * Has the application reserved the given node at the - * given priority? - * @param node node to be checked - * @param schedulerKey scheduler key of reserved container - * @return true is reserved, false if not - */ - public synchronized boolean isReserved(SchedulerNode node, - SchedulerRequestKey schedulerKey) { - Map reservedContainers = - this.reservedContainers.get(schedulerKey); - if (reservedContainers != null) { - return reservedContainers.containsKey(node.getNodeID()); + public void setHeadroom(Resource globalLimit) { + try { + writeLock.lock(); + this.resourceLimit = Resources.componentwiseMax(globalLimit, + Resources.none()); + } finally { + writeLock.unlock(); } - return false; - } - - public synchronized void setHeadroom(Resource globalLimit) { - this.resourceLimit = globalLimit; + } /** * Get available headroom in terms of resources for the application's user. * @return available resource headroom */ - public synchronized Resource getHeadroom() { - // Corner case to deal with applications being slightly over-limit - if (resourceLimit.getMemorySize() < 0) { - resourceLimit.setMemorySize(0); - } - + public Resource getHeadroom() { return resourceLimit; } - public synchronized int getNumReservedContainers( + public int getNumReservedContainers( SchedulerRequestKey schedulerKey) { - Map reservedContainers = - this.reservedContainers.get(schedulerKey); - return (reservedContainers == null) ? 0 : reservedContainers.size(); + try { + readLock.lock(); + Map reservedContainers = this.reservedContainers.get( + schedulerKey); + return (reservedContainers == null) ? 0 : reservedContainers.size(); + } finally { + readLock.unlock(); + } } @SuppressWarnings("unchecked") - public synchronized void containerLaunchedOnNode(ContainerId containerId, + public void containerLaunchedOnNode(ContainerId containerId, NodeId nodeId) { - // Inform the container - RMContainer rmContainer = getRMContainer(containerId); - if (rmContainer == null) { - // Some unknown container sneaked into the system. Kill it. - rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); - return; - } + try { + writeLock.lock(); + // Inform the container + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + rmContext.getDispatcher().getEventHandler().handle( + new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + } finally { + writeLock.unlock(); + } } - public synchronized void showRequests() { - if (LOG.isDebugEnabled()) { - for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { - Map requests = - getResourceRequests(schedulerKey); - if (requests != null) { - LOG.debug("showRequests:" + " application=" + getApplicationId() - + " headRoom=" + getHeadroom() + " currentConsumption=" - + attemptResourceUsage.getUsed().getMemorySize()); - for (ResourceRequest request : requests.values()) { + public void showRequests() { + try { + readLock.lock(); + if (LOG.isDebugEnabled()) { + for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { + Map requests = getResourceRequests( + schedulerKey); + if (requests != null) { LOG.debug("showRequests:" + " application=" + getApplicationId() - + " request=" + request); + + " headRoom=" + getHeadroom() + " currentConsumption=" + + attemptResourceUsage.getUsed().getMemorySize()); + for (ResourceRequest request : requests.values()) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " request=" + request); + } } } } + } finally { + readLock.unlock(); } } @@ -572,54 +644,72 @@ private Container updateContainerAndNMToken(RMContainer rmContainer, // Create container token and update NMToken altogether, if either of them fails for // some reason like DNS unavailable, do not return this container and keep it // in the newlyAllocatedContainers waiting to be refetched. - public synchronized List pullNewlyAllocatedContainers() { - List returnContainerList = - new ArrayList(newlyAllocatedContainers.size()); - for (Iterator i = newlyAllocatedContainers.iterator(); i - .hasNext();) { - RMContainer rmContainer = i.next(); - Container updatedContainer = - updateContainerAndNMToken(rmContainer, true, false); - // Only add container to return list when it's not null. updatedContainer - // could be null when generate token failed, it can be caused by DNS - // resolving failed. - if (updatedContainer != null) { - returnContainerList.add(updatedContainer); - i.remove(); + public List pullNewlyAllocatedContainers() { + try { + writeLock.lock(); + List returnContainerList = new ArrayList( + newlyAllocatedContainers.size()); + for (Iterator i = newlyAllocatedContainers.iterator(); + i.hasNext(); ) { + RMContainer rmContainer = i.next(); + Container updatedContainer = updateContainerAndNMToken(rmContainer, + true, false); + // Only add container to return list when it's not null. updatedContainer + // could be null when generate token failed, it can be caused by DNS + // resolving failed. + if (updatedContainer != null) { + returnContainerList.add(updatedContainer); + i.remove(); + } } + return returnContainerList; + } finally { + writeLock.unlock(); } - return returnContainerList; + } - private synchronized List pullNewlyUpdatedContainers( + private List pullNewlyUpdatedContainers( Map updatedContainerMap, boolean increase) { - List returnContainerList = - new ArrayList(updatedContainerMap.size()); - for (Iterator> i = - updatedContainerMap.entrySet().iterator(); i.hasNext();) { - RMContainer rmContainer = i.next().getValue(); - Container updatedContainer = - updateContainerAndNMToken(rmContainer, false, increase); - if (updatedContainer != null) { - returnContainerList.add(updatedContainer); - i.remove(); + try { + writeLock.lock(); + List returnContainerList = new ArrayList ( + updatedContainerMap.size()); + for (Iterator > i = + updatedContainerMap.entrySet().iterator(); i.hasNext(); ) { + RMContainer rmContainer = i.next().getValue(); + Container updatedContainer = updateContainerAndNMToken(rmContainer, + false, increase); + if (updatedContainer != null) { + returnContainerList.add(updatedContainer); + i.remove(); + } } + return returnContainerList; + } finally { + writeLock.unlock(); } - return returnContainerList; + } - public synchronized List pullNewlyIncreasedContainers() { + public List pullNewlyIncreasedContainers() { return pullNewlyUpdatedContainers(newlyIncreasedContainers, true); } - public synchronized List pullNewlyDecreasedContainers() { + public List pullNewlyDecreasedContainers() { return pullNewlyUpdatedContainers(newlyDecreasedContainers, false); } - public synchronized List pullUpdatedNMTokens() { - List returnList = new ArrayList(updatedNMTokens); - updatedNMTokens.clear(); - return returnList; + public List pullUpdatedNMTokens() { + try { + writeLock.lock(); + List returnList = new ArrayList<>(updatedNMTokens); + updatedNMTokens.clear(); + return returnList; + } finally { + writeLock.unlock(); + } + } public boolean isWaitingForAMContainer() { @@ -628,53 +718,62 @@ public boolean isWaitingForAMContainer() { return (!unmanagedAM && appAttempt.getMasterContainer() == null); } - public synchronized void updateBlacklist(List blacklistAdditions, + public void updateBlacklist(List blacklistAdditions, List blacklistRemovals) { - if (!isStopped) { - if (isWaitingForAMContainer()) { - // The request is for the AM-container, and the AM-container is launched - // by the system. So, update the places that are blacklisted by system - // (as opposed to those blacklisted by the application). - this.appSchedulingInfo.updatePlacesBlacklistedBySystem( - blacklistAdditions, blacklistRemovals); - } else { - this.appSchedulingInfo.updatePlacesBlacklistedByApp(blacklistAdditions, - blacklistRemovals); + try { + writeLock.lock(); + if (!isStopped) { + if (isWaitingForAMContainer()) { + // The request is for the AM-container, and the AM-container is launched + // by the system. So, update the places that are blacklisted by system + // (as opposed to those blacklisted by the application). + this.appSchedulingInfo.updatePlacesBlacklistedBySystem( + blacklistAdditions, blacklistRemovals); + } else{ + this.appSchedulingInfo.updatePlacesBlacklistedByApp( + blacklistAdditions, blacklistRemovals); + } } + } finally { + writeLock.unlock(); } } public boolean isPlaceBlacklisted(String resourceName) { - boolean forAMContainer = isWaitingForAMContainer(); - return this.appSchedulingInfo.isPlaceBlacklisted(resourceName, - forAMContainer); + try { + readLock.lock(); + boolean forAMContainer = isWaitingForAMContainer(); + return this.appSchedulingInfo.isPlaceBlacklisted(resourceName, + forAMContainer); + } finally { + readLock.unlock(); + } } - public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity( + public int addMissedNonPartitionedRequestSchedulingOpportunity( SchedulerRequestKey schedulerKey) { - missedNonPartitionedReqSchedulingOpportunity.add(schedulerKey); - return missedNonPartitionedReqSchedulingOpportunity.count(schedulerKey); + return missedNonPartitionedReqSchedulingOpportunity.add(schedulerKey, 1) + 1; } - public synchronized void + public void resetMissedNonPartitionedRequestSchedulingOpportunity( SchedulerRequestKey schedulerKey) { missedNonPartitionedReqSchedulingOpportunity.setCount(schedulerKey, 0); } - public synchronized void addSchedulingOpportunity( + public void addSchedulingOpportunity( SchedulerRequestKey schedulerKey) { - int count = schedulingOpportunities.count(schedulerKey); - if (count < Integer.MAX_VALUE) { - schedulingOpportunities.setCount(schedulerKey, count + 1); + try { + schedulingOpportunities.add(schedulerKey, 1); + } catch (IllegalArgumentException e) { + // This happens when count = MAX_INT, ignore the exception } } - public synchronized void subtractSchedulingOpportunity( + public void subtractSchedulingOpportunity( SchedulerRequestKey schedulerKey) { - int count = schedulingOpportunities.count(schedulerKey) - 1; - this.schedulingOpportunities.setCount(schedulerKey, Math.max(count, 0)); + this.schedulingOpportunities.removeExactly(schedulerKey, 1); } /** @@ -684,7 +783,7 @@ public synchronized void subtractSchedulingOpportunity( * @param schedulerKey Scheduler Key * @return number of scheduling opportunities */ - public synchronized int getSchedulingOpportunities( + public int getSchedulingOpportunities( SchedulerRequestKey schedulerKey) { return schedulingOpportunities.count(schedulerKey); } @@ -696,16 +795,22 @@ public synchronized int getSchedulingOpportunities( * * @param schedulerKey The priority of the container scheduled. */ - public synchronized void resetSchedulingOpportunities( + public void resetSchedulingOpportunities( SchedulerRequestKey schedulerKey) { resetSchedulingOpportunities(schedulerKey, System.currentTimeMillis()); } // used for continuous scheduling - public synchronized void resetSchedulingOpportunities( + public void resetSchedulingOpportunities( SchedulerRequestKey schedulerKey, long currentTimeMs) { - lastScheduledContainer.put(schedulerKey, currentTimeMs); - schedulingOpportunities.setCount(schedulerKey, 0); + try { + writeLock.lock(); + lastScheduledContainer.put(schedulerKey, currentTimeMs); + schedulingOpportunities.setCount(schedulerKey, 0); + } finally { + writeLock.unlock(); + } + } @VisibleForTesting @@ -713,7 +818,7 @@ void setSchedulingOpportunities(SchedulerRequestKey schedulerKey, int count) { schedulingOpportunities.setCount(schedulerKey, count); } - synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { + private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { long currentTimeMillis = System.currentTimeMillis(); // Don't walk the whole container list if the resources were computed // recently. @@ -737,101 +842,123 @@ synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds); } - public synchronized ApplicationResourceUsageReport getResourceUsageReport() { - AggregateAppResourceUsage runningResourceUsage = - getRunningAggregateAppResourceUsage(); - Resource usedResourceClone = - Resources.clone(attemptResourceUsage.getAllUsed()); - Resource reservedResourceClone = - Resources.clone(attemptResourceUsage.getReserved()); - Resource cluster = rmContext.getScheduler().getClusterResource(); - ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator(); - float queueUsagePerc = 0.0f; - float clusterUsagePerc = 0.0f; - if (!calc.isInvalidDivisor(cluster)) { - queueUsagePerc = - calc.divide(cluster, usedResourceClone, Resources.multiply(cluster, - queue.getQueueInfo(false, false).getCapacity())) * 100; - clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster) * 100; + public ApplicationResourceUsageReport getResourceUsageReport() { + try { + writeLock.lock(); + AggregateAppResourceUsage runningResourceUsage = + getRunningAggregateAppResourceUsage(); + Resource usedResourceClone = Resources.clone( + attemptResourceUsage.getAllUsed()); + Resource reservedResourceClone = Resources.clone( + attemptResourceUsage.getReserved()); + Resource cluster = rmContext.getScheduler().getClusterResource(); + ResourceCalculator calc = + rmContext.getScheduler().getResourceCalculator(); + float queueUsagePerc = 0.0f; + float clusterUsagePerc = 0.0f; + if (!calc.isInvalidDivisor(cluster)) { + queueUsagePerc = calc.divide(cluster, usedResourceClone, Resources + .multiply(cluster, queue.getQueueInfo(false, false).getCapacity())) + * 100; + clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster) + * 100; + } + return ApplicationResourceUsageReport.newInstance(liveContainers.size(), + reservedContainers.size(), usedResourceClone, reservedResourceClone, + Resources.add(usedResourceClone, reservedResourceClone), + runningResourceUsage.getMemorySeconds(), + runningResourceUsage.getVcoreSeconds(), queueUsagePerc, + clusterUsagePerc); + } finally { + writeLock.unlock(); } - return ApplicationResourceUsageReport.newInstance(liveContainers.size(), - reservedContainers.size(), usedResourceClone, reservedResourceClone, - Resources.add(usedResourceClone, reservedResourceClone), - runningResourceUsage.getMemorySeconds(), - runningResourceUsage.getVcoreSeconds(), queueUsagePerc, - clusterUsagePerc); } - public synchronized Map getLiveContainersMap() { + public Map getLiveContainersMap() { return this.liveContainers; } - public synchronized Resource getResourceLimit() { + public Resource getResourceLimit() { return this.resourceLimit; } - public synchronized Map + public Map getLastScheduledContainer() { return this.lastScheduledContainer; } - public synchronized void transferStateFromPreviousAttempt( + public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { - this.liveContainers = appAttempt.getLiveContainersMap(); - // this.reReservations = appAttempt.reReservations; - this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage); - this.resourceLimit = appAttempt.getResourceLimit(); - // this.currentReservation = appAttempt.currentReservation; - // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers; - // this.schedulingOpportunities = appAttempt.schedulingOpportunities; - this.lastScheduledContainer = appAttempt.getLastScheduledContainer(); - this.appSchedulingInfo - .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo); + try { + writeLock.lock(); + this.liveContainers = appAttempt.getLiveContainersMap(); + // this.reReservations = appAttempt.reReservations; + this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage); + this.setHeadroom(appAttempt.getResourceLimit()); + // this.currentReservation = appAttempt.currentReservation; + // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers; + // this.schedulingOpportunities = appAttempt.schedulingOpportunities; + this.lastScheduledContainer = appAttempt.getLastScheduledContainer(); + this.appSchedulingInfo.transferStateFromPreviousAppSchedulingInfo( + appAttempt.appSchedulingInfo); + } finally { + writeLock.unlock(); + } } - public synchronized void move(Queue newQueue) { - QueueMetrics oldMetrics = queue.getMetrics(); - QueueMetrics newMetrics = newQueue.getMetrics(); - String newQueueName = newQueue.getQueueName(); - String user = getUser(); - for (RMContainer liveContainer : liveContainers.values()) { - Resource resource = liveContainer.getContainer().getResource(); - ((RMContainerImpl)liveContainer).setQueueName(newQueueName); - oldMetrics.releaseResources(user, 1, resource); - newMetrics.allocateResources(user, 1, resource, false); - } - for (Map map : reservedContainers.values()) { - for (RMContainer reservedContainer : map.values()) { - ((RMContainerImpl)reservedContainer).setQueueName(newQueueName); - Resource resource = reservedContainer.getReservedResource(); - oldMetrics.unreserveResource(user, resource); - newMetrics.reserveResource(user, resource); + public void move(Queue newQueue) { + try { + writeLock.lock(); + QueueMetrics oldMetrics = queue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + String newQueueName = newQueue.getQueueName(); + String user = getUser(); + for (RMContainer liveContainer : liveContainers.values()) { + Resource resource = liveContainer.getContainer().getResource(); + ((RMContainerImpl) liveContainer).setQueueName(newQueueName); + oldMetrics.releaseResources(user, 1, resource); + newMetrics.allocateResources(user, 1, resource, false); + } + for (Map map : reservedContainers.values()) { + for (RMContainer reservedContainer : map.values()) { + ((RMContainerImpl) reservedContainer).setQueueName(newQueueName); + Resource resource = reservedContainer.getReservedResource(); + oldMetrics.unreserveResource(user, resource); + newMetrics.reserveResource(user, resource); + } } - } - appSchedulingInfo.move(newQueue); - this.queue = newQueue; + appSchedulingInfo.move(newQueue); + this.queue = newQueue; + } finally { + writeLock.unlock(); + } } - public synchronized void recoverContainer(SchedulerNode node, + public void recoverContainer(SchedulerNode node, RMContainer rmContainer) { - // recover app scheduling info - appSchedulingInfo.recoverContainer(rmContainer); + try { + writeLock.lock(); + // recover app scheduling info + appSchedulingInfo.recoverContainer(rmContainer); - if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { - return; + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + LOG.info("SchedulerAttempt " + getApplicationAttemptId() + + " is recovering container " + rmContainer.getContainerId()); + liveContainers.put(rmContainer.getContainerId(), rmContainer); + attemptResourceUsage.incUsed(node.getPartition(), + rmContainer.getContainer().getResource()); + + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource + // is called. + // newlyAllocatedContainers.add(rmContainer); + // schedulingOpportunities + // lastScheduledContainer + } finally { + writeLock.unlock(); } - LOG.info("SchedulerAttempt " + getApplicationAttemptId() - + " is recovering container " + rmContainer.getContainerId()); - liveContainers.put(rmContainer.getContainerId(), rmContainer); - attemptResourceUsage.incUsed(node.getPartition(), rmContainer - .getContainer().getResource()); - - // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource - // is called. - // newlyAllocatedContainers.add(rmContainer); - // schedulingOpportunities - // lastScheduledContainer } public void incNumAllocatedContainers(NodeType containerType, @@ -915,49 +1042,64 @@ public ResourceUsage getSchedulingResourceUsage() { return attemptResourceUsage; } - public synchronized boolean removeIncreaseRequest(NodeId nodeId, + public boolean removeIncreaseRequest(NodeId nodeId, SchedulerRequestKey schedulerKey, ContainerId containerId) { - return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey, - containerId); + try { + writeLock.lock(); + return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey, + containerId); + } finally { + writeLock.unlock(); + } } - public synchronized boolean updateIncreaseRequests( + public boolean updateIncreaseRequests( List increaseRequests) { - return appSchedulingInfo.updateIncreaseRequests(increaseRequests); + try { + writeLock.lock(); + return appSchedulingInfo.updateIncreaseRequests(increaseRequests); + } finally { + writeLock.unlock(); + } } - private synchronized void changeContainerResource( + private void changeContainerResource( SchedContainerChangeRequest changeRequest, boolean increase) { - if (increase) { - appSchedulingInfo.increaseContainer(changeRequest); - } else { - appSchedulingInfo.decreaseContainer(changeRequest); - } + try { + writeLock.lock(); + if (increase) { + appSchedulingInfo.increaseContainer(changeRequest); + } else{ + appSchedulingInfo.decreaseContainer(changeRequest); + } - RMContainer changedRMContainer = changeRequest.getRMContainer(); - changedRMContainer.handle( - new RMContainerChangeResourceEvent(changeRequest.getContainerId(), - changeRequest.getTargetCapacity(), increase)); - - // remove pending and not pulled by AM newly-increased/decreased-containers - // and add the new one - if (increase) { - newlyDecreasedContainers.remove(changeRequest.getContainerId()); - newlyIncreasedContainers.put(changeRequest.getContainerId(), - changedRMContainer); - } else { - newlyIncreasedContainers.remove(changeRequest.getContainerId()); - newlyDecreasedContainers.put(changeRequest.getContainerId(), - changedRMContainer); + RMContainer changedRMContainer = changeRequest.getRMContainer(); + changedRMContainer.handle( + new RMContainerChangeResourceEvent(changeRequest.getContainerId(), + changeRequest.getTargetCapacity(), increase)); + + // remove pending and not pulled by AM newly-increased/decreased-containers + // and add the new one + if (increase) { + newlyDecreasedContainers.remove(changeRequest.getContainerId()); + newlyIncreasedContainers.put(changeRequest.getContainerId(), + changedRMContainer); + } else{ + newlyIncreasedContainers.remove(changeRequest.getContainerId()); + newlyDecreasedContainers.put(changeRequest.getContainerId(), + changedRMContainer); + } + } finally { + writeLock.unlock(); } } - public synchronized void decreaseContainer( + public void decreaseContainer( SchedContainerChangeRequest decreaseRequest) { changeContainerResource(decreaseRequest, false); } - public synchronized void increaseContainer( + public void increaseContainer( SchedContainerChangeRequest increaseRequest) { changeContainerResource(increaseRequest, true); } @@ -1025,7 +1167,7 @@ protected void setAttemptRecovering(boolean isRecovering) { this.isAttemptRecovering = isRecovering; } - public static enum AMState { + public enum AMState { UNMANAGED("User launched the Application Master, since it's unmanaged. "), INACTIVATED("Application is added to the scheduler and is not yet activated. "), ACTIVATED("Application is Activated, waiting for resources to be assigned for AM. "), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 8d4042c..1a3f71f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -251,7 +251,7 @@ ContainerAllocation preAllocation(Resource clusterResource, return result; } - public synchronized float getLocalityWaitFactor( + public float getLocalityWaitFactor( SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 9c84a23..32d2d2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -99,7 +99,6 @@ * to hold the message if its app doesn't not get container from a node */ private String appSkipNodeDiagnostics; - private CapacitySchedulerContext capacitySchedulerContext; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -153,118 +152,128 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, containerAllocator = new ContainerAllocator(this, rc, rmContext, activitiesManager); - - if (scheduler instanceof CapacityScheduler) { - capacitySchedulerContext = (CapacitySchedulerContext) scheduler; - } } - public synchronized boolean containerCompleted(RMContainer rmContainer, + public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, String partition) { - ContainerId containerId = rmContainer.getContainerId(); + try { + writeLock.lock(); + ContainerId containerId = rmContainer.getContainerId(); - // Remove from the list of containers - if (null == liveContainers.remove(containerId)) { - return false; - } + // Remove from the list of containers + if (null == liveContainers.remove(containerId)) { + return false; + } - // Remove from the list of newly allocated containers if found - newlyAllocatedContainers.remove(rmContainer); + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); - // Inform the container - rmContainer.handle( - new RMContainerFinishedEvent(containerId, containerStatus, event)); + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent(containerId, containerStatus, event)); - containersToPreempt.remove(containerId); + containersToPreempt.remove(containerId); - Resource containerResource = rmContainer.getContainer().getResource(); - RMAuditLogger.logSuccess(getUser(), - AuditConstants.RELEASE_CONTAINER, "SchedulerApp", - getApplicationId(), containerId, containerResource); - - // Update usage metrics - queue.getMetrics().releaseResources(getUser(), 1, containerResource); - attemptResourceUsage.decUsed(partition, containerResource); + Resource containerResource = rmContainer.getContainer().getResource(); + RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, + "SchedulerApp", getApplicationId(), containerId, containerResource); + + // Update usage metrics + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + attemptResourceUsage.decUsed(partition, containerResource); - // Clear resource utilization metrics cache. - lastMemoryAggregateAllocationUpdateTime = -1; + // Clear resource utilization metrics cache. + lastMemoryAggregateAllocationUpdateTime = -1; - return true; + return true; + } finally { + writeLock.unlock(); + } } - public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, + public RMContainer allocate(NodeType type, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest request, Container container) { + try { + writeLock.lock(); - if (isStopped) { - return null; - } - - // Required sanity check - AM can call 'allocate' to update resource - // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(schedulerKey) <= 0) { - return null; - } + if (isStopped) { + return null; + } + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(schedulerKey) <= 0) { + return null; + } - // Create RMContainer - RMContainer rmContainer = - new RMContainerImpl(container, this.getApplicationAttemptId(), - node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, - request.getNodeLabelExpression()); - ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, + this.getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), this.rmContext, + request.getNodeLabelExpression()); + ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); - updateAMContainerDiagnostics(AMState.ASSIGNED, null); + updateAMContainerDiagnostics(AMState.ASSIGNED, null); - // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); - ContainerId containerId = container.getId(); - liveContainers.put(containerId, rmContainer); + ContainerId containerId = container.getId(); + liveContainers.put(containerId, rmContainer); - // Update consumption and track allocations - List resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, request, container); + // Update consumption and track allocations + List resourceRequestList = appSchedulingInfo.allocate( + type, node, schedulerKey, request, container); - attemptResourceUsage.incUsed(node.getPartition(), container.getResource()); + attemptResourceUsage.incUsed(node.getPartition(), + container.getResource()); - // Update resource requests related to "request" and store in RMContainer - ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList); + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); - // Inform the container - rmContainer.handle( - new RMContainerEvent(containerId, RMContainerEventType.START)); + // Inform the container + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.START)); - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationAttemptId=" - + containerId.getApplicationAttemptId() - + " container=" + containerId + " host=" - + container.getNodeId().getHost() + " type=" + type); + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + containerId + .getApplicationAttemptId() + " container=" + containerId + " host=" + + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, + "SchedulerApp", getApplicationId(), containerId, + container.getResource()); + + return rmContainer; + } finally { + writeLock.unlock(); } - RMAuditLogger.logSuccess(getUser(), - AuditConstants.ALLOC_CONTAINER, "SchedulerApp", - getApplicationId(), containerId, container.getResource()); - - return rmContainer; } - public synchronized boolean unreserve(SchedulerRequestKey schedulerKey, + public boolean unreserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer) { - // Cancel increase request (if it has reserved increase request - rmContainer.cancelIncreaseReservation(); - - // Done with the reservation? - if (internalUnreserve(node, schedulerKey)) { - node.unreserveResource(this); - - // Update reserved metrics - queue.getMetrics().unreserveResource(getUser(), - rmContainer.getReservedResource()); - queue.decReservedResource(node.getPartition(), - rmContainer.getReservedResource()); - return true; + try { + writeLock.lock(); + // Cancel increase request (if it has reserved increase request + rmContainer.cancelIncreaseReservation(); + + // Done with the reservation? + if (internalUnreserve(node, schedulerKey)) { + node.unreserveResource(this); + + // Update reserved metrics + queue.getMetrics().unreserveResource(getUser(), + rmContainer.getReservedResource()); + queue.decReservedResource(node.getPartition(), + rmContainer.getReservedResource()); + return true; + } + return false; + } finally { + writeLock.unlock(); } - return false; } private boolean internalUnreserve(FiCaSchedulerNode node, @@ -303,33 +312,15 @@ private boolean internalUnreserve(FiCaSchedulerNode node, return false; } - public synchronized float getLocalityWaitFactor( - SchedulerRequestKey schedulerKey, int clusterNodes) { - // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = - Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0); - - // waitFactor can't be more than '1' - // i.e. no point skipping more than clustersize opportunities - return Math.min(((float)requiredResources / clusterNodes), 1.0f); - } - - public synchronized Resource getTotalPendingRequests() { - Resource ret = Resource.newInstance(0, 0); - for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { - // to avoid double counting we count only "ANY" resource requests - if (ResourceRequest.isAnyLocation(rr.getResourceName())){ - Resources.addTo(ret, - Resources.multiply(rr.getCapability(), rr.getNumContainers())); + public void markContainerForPreemption(ContainerId cont) { + try { + writeLock.lock(); + // ignore already completed containers + if (liveContainers.containsKey(cont)) { + containersToPreempt.add(cont); } - } - return ret; - } - - public synchronized void markContainerForPreemption(ContainerId cont) { - // ignore already completed containers - if (liveContainers.containsKey(cont)) { - containersToPreempt.add(cont); + } finally { + writeLock.unlock(); } } @@ -343,94 +334,115 @@ public synchronized void markContainerForPreemption(ContainerId cont) { * @param minimumAllocation * @return an allocation */ - public synchronized Allocation getAllocation(ResourceCalculator rc, + public Allocation getAllocation(ResourceCalculator rc, Resource clusterResource, Resource minimumAllocation) { - - Set currentContPreemption = Collections.unmodifiableSet( - new HashSet(containersToPreempt)); - containersToPreempt.clear(); - Resource tot = Resource.newInstance(0, 0); - for(ContainerId c : currentContPreemption){ - Resources.addTo(tot, - liveContainers.get(c).getContainer().getResource()); + try { + writeLock.lock(); + Set currentContPreemption = Collections.unmodifiableSet( + new HashSet(containersToPreempt)); + containersToPreempt.clear(); + Resource tot = Resource.newInstance(0, 0); + for (ContainerId c : currentContPreemption) { + Resources.addTo(tot, liveContainers.get(c).getContainer() + .getResource()); + } + int numCont = (int) Math.ceil( + Resources.divide(rc, clusterResource, tot, minimumAllocation)); + ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED, + ResourceRequest.ANY, minimumAllocation, numCont); + List newlyAllocatedContainers = pullNewlyAllocatedContainers(); + List newlyIncreasedContainers = pullNewlyIncreasedContainers(); + List newlyDecreasedContainers = pullNewlyDecreasedContainers(); + List updatedNMTokens = pullUpdatedNMTokens(); + Resource headroom = getHeadroom(); + setApplicationHeadroomForMetrics(headroom); + return new Allocation(newlyAllocatedContainers, headroom, null, + currentContPreemption, Collections.singletonList(rr), updatedNMTokens, + newlyIncreasedContainers, newlyDecreasedContainers); + } finally { + writeLock.unlock(); } - int numCont = (int) Math.ceil( - Resources.divide(rc, clusterResource, tot, minimumAllocation)); - ResourceRequest rr = ResourceRequest.newInstance( - Priority.UNDEFINED, ResourceRequest.ANY, - minimumAllocation, numCont); - List newlyAllocatedContainers = pullNewlyAllocatedContainers(); - List newlyIncreasedContainers = pullNewlyIncreasedContainers(); - List newlyDecreasedContainers = pullNewlyDecreasedContainers(); - List updatedNMTokens = pullUpdatedNMTokens(); - Resource headroom = getHeadroom(); - setApplicationHeadroomForMetrics(headroom); - return new Allocation(newlyAllocatedContainers, headroom, null, - currentContPreemption, Collections.singletonList(rr), updatedNMTokens, - newlyIncreasedContainers, newlyDecreasedContainers); } - - synchronized public NodeId getNodeIdToUnreserve( + + @VisibleForTesting + public NodeId getNodeIdToUnreserve( SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve, ResourceCalculator rc, Resource clusterResource) { + try { + writeLock.lock(); + // first go around make this algorithm simple and just grab first + // reservation that has enough resources + Map reservedContainers = this.reservedContainers.get( + schedulerKey); + + if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { + for (Map.Entry entry : reservedContainers + .entrySet()) { + NodeId nodeId = entry.getKey(); + RMContainer reservedContainer = entry.getValue(); + if (reservedContainer.hasIncreaseReservation()) { + // Currently, only regular container allocation supports continuous + // reservation looking, we don't support canceling increase request + // reservation when allocating regular container. + continue; + } - // first go around make this algorithm simple and just grab first - // reservation that has enough resources - Map reservedContainers = this.reservedContainers - .get(schedulerKey); - - if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { - for (Map.Entry entry : reservedContainers.entrySet()) { - NodeId nodeId = entry.getKey(); - RMContainer reservedContainer = entry.getValue(); - if (reservedContainer.hasIncreaseReservation()) { - // Currently, only regular container allocation supports continuous - // reservation looking, we don't support canceling increase request - // reservation when allocating regular container. - continue; - } - - Resource reservedResource = reservedContainer.getReservedResource(); - - // make sure we unreserve one with at least the same amount of - // resources, otherwise could affect capacity limits - if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve, - reservedResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("unreserving node with reservation size: " - + reservedResource - + " in order to allocate container with size: " + resourceNeedUnreserve); + Resource reservedResource = reservedContainer.getReservedResource(); + + // make sure we unreserve one with at least the same amount of + // resources, otherwise could affect capacity limits + if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve, + reservedResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "unreserving node with reservation size: " + reservedResource + + " in order to allocate container with size: " + + resourceNeedUnreserve); + } + return nodeId; } - return nodeId; } } + return null; + } finally { + writeLock.unlock(); } - return null; } - public synchronized void setHeadroomProvider( + public void setHeadroomProvider( CapacityHeadroomProvider headroomProvider) { - this.headroomProvider = headroomProvider; - } - - public synchronized CapacityHeadroomProvider getHeadroomProvider() { - return headroomProvider; + try { + writeLock.lock(); + this.headroomProvider = headroomProvider; + } finally { + writeLock.unlock(); + } } @Override - public synchronized Resource getHeadroom() { - if (headroomProvider != null) { - return headroomProvider.getHeadroom(); + public Resource getHeadroom() { + try { + readLock.lock(); + if (headroomProvider != null) { + return headroomProvider.getHeadroom(); + } + return super.getHeadroom(); + } finally { + readLock.unlock(); } - return super.getHeadroom(); + } @Override - public synchronized void transferStateFromPreviousAttempt( + public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { - super.transferStateFromPreviousAttempt(appAttempt); - this.headroomProvider = - ((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); + try { + writeLock.lock(); + super.transferStateFromPreviousAttempt(appAttempt); + this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider; + } finally { + writeLock.unlock(); + } } public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey, @@ -444,11 +456,11 @@ public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey, // Update the node node.reserveResource(this, schedulerKey, rmContainer); - + // Succeeded return true; } - + return false; } @@ -515,9 +527,12 @@ public CSAssignment assignContainers(Resource clusterResource, showRequests(); } - synchronized (this) { + try { + writeLock.lock(); return containerAllocator.assignContainers(clusterResource, node, schedulingMode, currentResourceLimits, reservedContainer); + } finally { + writeLock.unlock(); } } @@ -625,23 +640,33 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { * Capacity Scheduler. */ @Override - public synchronized ApplicationResourceUsageReport getResourceUsageReport() { - ApplicationResourceUsageReport report = super.getResourceUsageReport(); - Resource cluster = rmContext.getScheduler().getClusterResource(); - Resource totalPartitionRes = - rmContext.getNodeLabelManager() - .getResourceByLabel(getAppAMNodePartitionName(), cluster); - ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator(); - if (!calc.isInvalidDivisor(totalPartitionRes)) { - float queueAbsMaxCapPerPartition = - ((AbstractCSQueue)getQueue()).getQueueCapacities() - .getAbsoluteCapacity(getAppAMNodePartitionName()); - float queueUsagePerc = - calc.divide(totalPartitionRes, report.getUsedResources(), - Resources.multiply(totalPartitionRes, - queueAbsMaxCapPerPartition)) * 100; - report.setQueueUsagePercentage(queueUsagePerc); + public ApplicationResourceUsageReport getResourceUsageReport() { + try { + // Use write lock here because + // SchedulerApplicationAttempt#getResourceUsageReport updated fields + // TODO: improve this + writeLock.lock(); + ApplicationResourceUsageReport report = super.getResourceUsageReport(); + Resource cluster = rmContext.getScheduler().getClusterResource(); + Resource totalPartitionRes = + rmContext.getNodeLabelManager().getResourceByLabel( + getAppAMNodePartitionName(), cluster); + ResourceCalculator calc = + rmContext.getScheduler().getResourceCalculator(); + if (!calc.isInvalidDivisor(totalPartitionRes)) { + float queueAbsMaxCapPerPartition = + ((AbstractCSQueue) getQueue()).getQueueCapacities() + .getAbsoluteCapacity(getAppAMNodePartitionName()); + float queueUsagePerc = calc.divide(totalPartitionRes, + report.getUsedResources(), + Resources.multiply(totalPartitionRes, queueAbsMaxCapPerPartition)) + * 100; + report.setQueueUsagePercentage(queueUsagePerc); + } + return report; + } finally { + writeLock.unlock(); } - return report; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 9e5a807..f1fb055 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -123,65 +123,72 @@ public QueueMetrics getMetrics() { return queue.getMetrics(); } - synchronized public void containerCompleted(RMContainer rmContainer, + public void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { - - Container container = rmContainer.getContainer(); - ContainerId containerId = container.getId(); - - // Remove from the list of newly allocated containers if found - newlyAllocatedContainers.remove(rmContainer); - - // Inform the container - rmContainer.handle( - new RMContainerFinishedEvent( - containerId, - containerStatus, - event) - ); - if (LOG.isDebugEnabled()) { - LOG.debug("Completed container: " + rmContainer.getContainerId() + - " in state: " + rmContainer.getState() + " event:" + event); - } + try { + writeLock.lock(); + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); + + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent(containerId, containerStatus, event)); + if (LOG.isDebugEnabled()) { + LOG.debug("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + } + + // Remove from the list of containers + liveContainers.remove(rmContainer.getContainerId()); - // Remove from the list of containers - liveContainers.remove(rmContainer.getContainerId()); + Resource containerResource = rmContainer.getContainer().getResource(); + RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, + "SchedulerApp", getApplicationId(), containerId, containerResource); - Resource containerResource = rmContainer.getContainer().getResource(); - RMAuditLogger.logSuccess(getUser(), - AuditConstants.RELEASE_CONTAINER, "SchedulerApp", - getApplicationId(), containerId, containerResource); - - // Update usage metrics - queue.getMetrics().releaseResources(getUser(), 1, containerResource); - this.attemptResourceUsage.decUsed(containerResource); + // Update usage metrics + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + this.attemptResourceUsage.decUsed(containerResource); - // remove from preemption map if it is completed - preemptionMap.remove(rmContainer); + // remove from preemption map if it is completed + preemptionMap.remove(rmContainer); - // Clear resource utilization metrics cache. - lastMemoryAggregateAllocationUpdateTime = -1; + // Clear resource utilization metrics cache. + lastMemoryAggregateAllocationUpdateTime = -1; + } finally { + writeLock.unlock(); + } } - private synchronized void unreserveInternal( + private void unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { - Map reservedContainers = - this.reservedContainers.get(schedulerKey); - RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); - if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(schedulerKey); - } - - // Reset the re-reservation count - resetReReservations(schedulerKey); + try { + writeLock.lock(); + Map reservedContainers = this.reservedContainers.get( + schedulerKey); + RMContainer reservedContainer = reservedContainers.remove( + node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(schedulerKey); + } - Resource resource = reservedContainer.getContainer().getResource(); - this.attemptResourceUsage.decReserved(resource); + // Reset the re-reservation count + resetReReservations(schedulerKey); - LOG.info("Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() - + " at priority " + schedulerKey.getPriority() + "; currentReservation " - + this.attemptResourceUsage.getReserved()); + Resource resource = reservedContainer.getContainer().getResource(); + this.attemptResourceUsage.decReserved(resource); + + LOG.info( + "Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + + " at priority " + schedulerKey.getPriority() + + "; currentReservation " + this.attemptResourceUsage + .getReserved()); + } finally { + writeLock.unlock(); + } } private void subtractResourcesOnBlacklistedNodes( @@ -239,17 +246,6 @@ public Resource getHeadroom() { return headroom; } - public synchronized float getLocalityWaitFactor( - SchedulerRequestKey schedulerKey, int clusterNodes) { - // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = - Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0); - - // waitFactor can't be more than '1' - // i.e. no point skipping more than clustersize opportunities - return Math.min(((float)requiredResources / clusterNodes), 1.0f); - } - /** * Return the level at which we are allowed to schedule containers, given the * current size of the cluster and thresholds indicating how many nodes to @@ -261,44 +257,54 @@ public synchronized float getLocalityWaitFactor( * @param rackLocalityThreshold rackLocalityThreshold * @return NodeType */ - public synchronized NodeType getAllowedLocalityLevel( + NodeType getAllowedLocalityLevel( SchedulerRequestKey schedulerKey, int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { - // upper limit on threshold - if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } - if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } + try { + writeLock.lock(); + // upper limit on threshold + if (nodeLocalityThreshold > 1.0) { + nodeLocalityThreshold = 1.0; + } + if (rackLocalityThreshold > 1.0) { + rackLocalityThreshold = 1.0; + } - // If delay scheduling is not being used, can schedule anywhere - if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { - return NodeType.OFF_SWITCH; - } + // If delay scheduling is not being used, can schedule anywhere + if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { + return NodeType.OFF_SWITCH; + } - // Default level is NODE_LOCAL - if (!allowedLocalityLevel.containsKey(schedulerKey)) { - allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); - return NodeType.NODE_LOCAL; - } + // Default level is NODE_LOCAL + if (!allowedLocalityLevel.containsKey(schedulerKey)) { + allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } - NodeType allowed = allowedLocalityLevel.get(schedulerKey); + NodeType allowed = allowedLocalityLevel.get(schedulerKey); - // If level is already most liberal, we're done - if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; + // If level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) + return NodeType.OFF_SWITCH; - double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold : - rackLocalityThreshold; + double threshold = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityThreshold : + rackLocalityThreshold; - // Relax locality constraints once we've surpassed threshold. - if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) { - if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(schedulerKey); - } - else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(schedulerKey); + // Relax locality constraints once we've surpassed threshold. + if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(schedulerKey); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(schedulerKey); + } } + return allowedLocalityLevel.get(schedulerKey); + } finally { + writeLock.unlock(); } - return allowedLocalityLevel.get(schedulerKey); } /** @@ -311,120 +317,129 @@ else if (allowed.equals(NodeType.RACK_LOCAL)) { * @param currentTimeMs currentTimeMs * @return NodeType */ - public synchronized NodeType getAllowedLocalityLevelByTime( + NodeType getAllowedLocalityLevelByTime( SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs, long rackLocalityDelayMs, long currentTimeMs) { + try { + writeLock.lock(); - // if not being used, can schedule anywhere - if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { - return NodeType.OFF_SWITCH; - } - - // default level is NODE_LOCAL - if (!allowedLocalityLevel.containsKey(schedulerKey)) { - // add the initial time of priority to prevent comparing with FsApp - // startTime and allowedLocalityLevel degrade - lastScheduledContainer.put(schedulerKey, currentTimeMs); - if (LOG.isDebugEnabled()) { - LOG.debug("Init the lastScheduledContainer time, priority: " - + schedulerKey.getPriority() + ", time: " + currentTimeMs); + // if not being used, can schedule anywhere + if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { + return NodeType.OFF_SWITCH; } - allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); - return NodeType.NODE_LOCAL; - } - NodeType allowed = allowedLocalityLevel.get(schedulerKey); + // default level is NODE_LOCAL + if (!allowedLocalityLevel.containsKey(schedulerKey)) { + // add the initial time of priority to prevent comparing with FsApp + // startTime and allowedLocalityLevel degrade + lastScheduledContainer.put(schedulerKey, currentTimeMs); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Init the lastScheduledContainer time, priority: " + schedulerKey + .getPriority() + ", time: " + currentTimeMs); + } + allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } - // if level is already most liberal, we're done - if (allowed.equals(NodeType.OFF_SWITCH)) { - return NodeType.OFF_SWITCH; - } + NodeType allowed = allowedLocalityLevel.get(schedulerKey); - // check waiting time - long waitTime = currentTimeMs; - if (lastScheduledContainer.containsKey(schedulerKey)) { - waitTime -= lastScheduledContainer.get(schedulerKey); - } else { - waitTime -= getStartTime(); - } + // if level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; + } - long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? - nodeLocalityDelayMs : rackLocalityDelayMs; + // check waiting time + long waitTime = currentTimeMs; + if (lastScheduledContainer.containsKey(schedulerKey)) { + waitTime -= lastScheduledContainer.get(schedulerKey); + } else{ + waitTime -= getStartTime(); + } - if (waitTime > thresholdTime) { - if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(schedulerKey, currentTimeMs); - } else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(schedulerKey, currentTimeMs); + long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityDelayMs : + rackLocalityDelayMs; + + if (waitTime > thresholdTime) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(schedulerKey, currentTimeMs); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(schedulerKey, currentTimeMs); + } } + return allowedLocalityLevel.get(schedulerKey); + } finally { + writeLock.unlock(); } - return allowedLocalityLevel.get(schedulerKey); } - synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, + public RMContainer allocate(NodeType type, FSSchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest request, Container reservedContainer) { - // Update allowed locality level - NodeType allowed = allowedLocalityLevel.get(schedulerKey); - if (allowed != null) { - if (allowed.equals(NodeType.OFF_SWITCH) && - (type.equals(NodeType.NODE_LOCAL) || - type.equals(NodeType.RACK_LOCAL))) { - this.resetAllowedLocalityLevel(schedulerKey, type); + try { + writeLock.lock(); + // Update allowed locality level + NodeType allowed = allowedLocalityLevel.get(schedulerKey); + if (allowed != null) { + if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals( + NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) { + this.resetAllowedLocalityLevel(schedulerKey, type); + } else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals( + NodeType.NODE_LOCAL)) { + this.resetAllowedLocalityLevel(schedulerKey, type); + } } - else if (allowed.equals(NodeType.RACK_LOCAL) && - type.equals(NodeType.NODE_LOCAL)) { - this.resetAllowedLocalityLevel(schedulerKey, type); + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(schedulerKey) <= 0) { + return null; } - } - // Required sanity check - AM can call 'allocate' to update resource - // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(schedulerKey) <= 0) { - return null; - } + Container container = reservedContainer; + if (container == null) { + container = createContainer(node, request.getCapability(), + schedulerKey); + } - Container container = reservedContainer; - if (container == null) { - container = - createContainer(node, request.getCapability(), schedulerKey); - } - - // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, - getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), rmContext); - ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, + getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), rmContext); + ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); - // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); - liveContainers.put(container.getId(), rmContainer); + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); - // Update consumption and track allocations - List resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, request, container); - this.attemptResourceUsage.incUsed(container.getResource()); + // Update consumption and track allocations + List resourceRequestList = appSchedulingInfo.allocate( + type, node, schedulerKey, request, container); + this.attemptResourceUsage.incUsed(container.getResource()); - // Update resource requests related to "request" and store in RMContainer - ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); - // Inform the container - rmContainer.handle( - new RMContainerEvent(container.getId(), RMContainerEventType.START)); + // Inform the container + rmContainer.handle( + new RMContainerEvent(container.getId(), RMContainerEventType.START)); - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationAttemptId=" - + container.getId().getApplicationAttemptId() - + " container=" + container.getId() + " host=" - + container.getNodeId().getHost() + " type=" + type); + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + container.getId() + .getApplicationAttemptId() + " container=" + container.getId() + + " host=" + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, + "SchedulerApp", getApplicationId(), container.getId(), + container.getResource()); + + return rmContainer; + } finally { + writeLock.unlock(); } - RMAuditLogger.logSuccess(getUser(), - AuditConstants.ALLOC_CONTAINER, "SchedulerApp", - getApplicationId(), container.getId(), container.getResource()); - - return rmContainer; } /** @@ -434,19 +449,29 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && * @param schedulerKey Scheduler Key * @param level NodeType */ - public synchronized void resetAllowedLocalityLevel( + public void resetAllowedLocalityLevel( SchedulerRequestKey schedulerKey, NodeType level) { - NodeType old = allowedLocalityLevel.get(schedulerKey); - LOG.info("Raising locality level from " + old + " to " + level + " at " + - " priority " + schedulerKey.getPriority()); - allowedLocalityLevel.put(schedulerKey, level); + try { + writeLock.lock(); + NodeType old = allowedLocalityLevel.get(schedulerKey); + LOG.info("Raising locality level from " + old + " to " + level + " at " + + " priority " + schedulerKey.getPriority()); + allowedLocalityLevel.put(schedulerKey, level); + } finally { + writeLock.unlock(); + } } // related methods public void addPreemption(RMContainer container, long time) { assert preemptionMap.get(container) == null; - preemptionMap.put(container, time); - Resources.addTo(preemptedResources, container.getAllocatedResource()); + try { + writeLock.lock(); + preemptionMap.put(container, time); + Resources.addTo(preemptedResources, container.getAllocatedResource()); + } finally { + writeLock.unlock(); + } } public Long getContainerPreemptionTime(RMContainer container) { @@ -584,21 +609,33 @@ public void unreserve(SchedulerRequestKey schedulerKey, getUser(), rmContainer.getContainer().getResource()); } - private synchronized void setReservation(SchedulerNode node) { - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - Set rackReservations = reservations.get(rackName); - if (rackReservations == null) { - rackReservations = new HashSet<>(); - reservations.put(rackName, rackReservations); + private void setReservation(SchedulerNode node) { + try { + writeLock.lock(); + String rackName = + node.getRackName() == null ? "NULL" : node.getRackName(); + Set rackReservations = reservations.get(rackName); + if (rackReservations == null) { + rackReservations = new HashSet<>(); + reservations.put(rackName, rackReservations); + } + rackReservations.add(node.getNodeName()); + } finally { + writeLock.unlock(); } - rackReservations.add(node.getNodeName()); } - private synchronized void clearReservation(SchedulerNode node) { - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - Set rackReservations = reservations.get(rackName); - if (rackReservations != null) { - rackReservations.remove(node.getNodeName()); + private void clearReservation(SchedulerNode node) { + try { + writeLock.lock(); + String rackName = + node.getRackName() == null ? "NULL" : node.getRackName(); + Set rackReservations = reservations.get(rackName); + if (rackReservations != null) { + rackReservations.remove(node.getNodeName()); + } + } finally { + writeLock.unlock(); } } @@ -737,7 +774,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // For each priority, see if we can schedule a node local, rack local // or off-switch request. Rack of off-switch requests may be delayed // (not scheduled) in order to promote better locality. - synchronized (this) { + try { + writeLock.lock(); for (SchedulerRequestKey schedulerKey : keysToTry) { // Skip it for reserved container, since // we already check it in isValidReservation. @@ -763,7 +801,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { scheduler.getNodeLocalityDelayMs(), scheduler.getRackLocalityDelayMs(), scheduler.getClock().getTime()); - } else { + } else{ allowedLocality = getAllowedLocalityLevel(schedulerKey, scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(), @@ -772,8 +810,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && localRequest != null && localRequest.getNumContainers() != 0) { - return assignContainer(node, localRequest, - NodeType.NODE_LOCAL, reserved, schedulerKey); + return assignContainer(node, localRequest, NodeType.NODE_LOCAL, + reserved, schedulerKey); } if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { @@ -781,29 +819,31 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { } if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 - && (allowedLocality.equals(NodeType.RACK_LOCAL) || - allowedLocality.equals(NodeType.OFF_SWITCH))) { - return assignContainer(node, rackLocalRequest, - NodeType.RACK_LOCAL, reserved, schedulerKey); + && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality + .equals(NodeType.OFF_SWITCH))) { + return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL, + reserved, schedulerKey); } - ResourceRequest offSwitchRequest = - getResourceRequest(schedulerKey, ResourceRequest.ANY); + ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey, + ResourceRequest.ANY); if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { continue; } - if (offSwitchRequest != null && - offSwitchRequest.getNumContainers() != 0) { - if (!hasNodeOrRackLocalRequests(schedulerKey) || - allowedLocality.equals(NodeType.OFF_SWITCH)) { - return assignContainer( - node, offSwitchRequest, NodeType.OFF_SWITCH, reserved, - schedulerKey); + if (offSwitchRequest != null + && offSwitchRequest.getNumContainers() != 0) { + if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality + .equals(NodeType.OFF_SWITCH)) { + return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH, + reserved, schedulerKey); } } } + } finally { + writeLock.unlock(); } + return Resources.none(); } @@ -963,14 +1003,17 @@ public void updateDemand() { Resources.addTo(demand, getCurrentConsumption()); // Add up outstanding resource requests - synchronized (this) { + try { + writeLock.lock(); for (SchedulerRequestKey k : getSchedulerKeys()) { ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY); if (r != null) { - Resources.multiplyAndAddTo(demand, - r.getCapability(), r.getNumContainers()); + Resources.multiplyAndAddTo(demand, r.getCapability(), + r.getNumContainers()); } } + } finally { + writeLock.unlock(); } }