diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a565fe75656..2d3807bb09f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -31,11 +31,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - import org.apache.commons.collections.keyvalue.DefaultMapEntry; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -110,9 +107,6 @@ private static final RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); - private final ReadLock readLock; - private final WriteLock writeLock; - private final ConcurrentLinkedQueue nodeUpdateQueue; private volatile boolean nextHeartBeat = true; @@ -134,7 +128,7 @@ private String nodeManagerVersion; private Integer decommissioningTimeout; - private long timeStamp; + private volatile long timeStamp; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; /* Resource utilization for the node. */ @@ -377,6 +371,9 @@ private final StateMachine stateMachine; + // autocloseable reentrant read write lock + protected final AutoCloseableRWLock lock; + public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { @@ -387,6 +384,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion, Resource physResource) { + this.lock = new AutoCloseableRWLock(); this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -404,10 +402,6 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.latestNodeHeartBeatResponse.setResponseId(0); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - this.readLock = lock.readLock(); - this.writeLock = lock.writeLock(); - this.stateMachine = stateMachineFactory.make(this); this.nodeUpdateQueue = new ConcurrentLinkedQueue(); @@ -482,43 +476,23 @@ public Node getNode() { @Override public String getHealthReport() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.healthReport; - } finally { - this.readLock.unlock(); } } - public void setHealthReport(String healthReport) { - this.writeLock.lock(); - - try { - this.healthReport = healthReport; - } finally { - this.writeLock.unlock(); - } + private void setHealthReportInternal(String healthReport) { + this.healthReport = healthReport; } - public void setLastHealthReportTime(long lastHealthReportTime) { - this.writeLock.lock(); - - try { - this.lastHealthReportTime = lastHealthReportTime; - } finally { - this.writeLock.unlock(); - } + private void setLastHealthReportTimeInternal(long lastHealthReportTime) { + this.lastHealthReportTime = lastHealthReportTime; } - + @Override public long getLastHealthReportTime() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.lastHealthReportTime; - } finally { - this.readLock.unlock(); } } @@ -529,45 +503,25 @@ public String getNodeManagerVersion() { @Override public ResourceUtilization getAggregatedContainersUtilization() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.containersUtilization; - } finally { - this.readLock.unlock(); } } - public void setAggregatedContainersUtilization( + private void setAggregatedContainersUtilizationInternal( ResourceUtilization containersUtilization) { - this.writeLock.lock(); - - try { - this.containersUtilization = containersUtilization; - } finally { - this.writeLock.unlock(); - } + this.containersUtilization = containersUtilization; } @Override public ResourceUtilization getNodeUtilization() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.nodeUtilization; - } finally { - this.readLock.unlock(); } } - public void setNodeUtilization(ResourceUtilization nodeUtilization) { - this.writeLock.lock(); - - try { - this.nodeUtilization = nodeUtilization; - } finally { - this.writeLock.unlock(); - } + private void setNodeUtilizationInternal(ResourceUtilization nodeUtilization) { + this.nodeUtilization = nodeUtilization; } @Override @@ -581,55 +535,36 @@ public void setPhysicalResource(Resource physicalResource) { @Override public NodeState getState() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.stateMachine.getCurrentState(); - } finally { - this.readLock.unlock(); } } @Override public List getAppsToCleanup() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return new ArrayList(this.finishedApplications); - } finally { - this.readLock.unlock(); } - } @Override public List getRunningApps() { - this.readLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return new ArrayList(this.runningApplications); - } finally { - this.readLock.unlock(); } } @Override public List getContainersToCleanUp() { - - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return new ArrayList(this.containersToClean); - } finally { - this.readLock.unlock(); } - }; + } @Override public void setAndUpdateNodeHeartbeatResponse( NodeHeartbeatResponse response) { - this.writeLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); @@ -652,10 +587,8 @@ public void setAndUpdateNodeHeartbeatResponse( // Synchronously update the last response in rmNode with updated // responseId this.latestNodeHeartBeatResponse = response; - } finally { - this.writeLock.unlock(); } - }; + } @VisibleForTesting public Collection getToBeUpdatedContainers() { @@ -664,28 +597,21 @@ public void setAndUpdateNodeHeartbeatResponse( @Override public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { - this.readLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.latestNodeHeartBeatResponse; - } finally { - this.readLock.unlock(); } } @Override public void resetLastNodeHeartBeatResponse() { - this.writeLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { latestNodeHeartBeatResponse.setResponseId(0); - } finally { - this.writeLock.unlock(); } } public void handle(RMNodeEvent event) { LOG.debug("Processing {} of type {}", event.getNodeId(), event.getType()); - writeLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { NodeState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); @@ -699,10 +625,6 @@ public void handle(RMNodeEvent event) { + getState()); } } - - finally { - writeLock.unlock(); - } } private void updateMetricsForRejoinedNode(NodeState previousNodeState) { @@ -836,17 +758,26 @@ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, rmNode.updatedCapability = true; } + // used to update a node optimizing the lock acquistions + private void updateRMNodeFromNodeHealthStatus(NodeHealthStatus remoteNodeHealthStatus) { + + } + private static NodeHealthStatus updateRMNodeFromStatusEvents( RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) { - // Switch the last heartbeatresponse. - NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); - rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); - rmNode.setLastHealthReportTime(remoteNodeHealthStatus - .getLastHealthReportTime()); - rmNode.setAggregatedContainersUtilization(statusEvent - .getAggregatedContainersUtilization()); - rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); - return remoteNodeHealthStatus; + // acquire the lock to do a sequence of updates + try (AutoCloseableRWLock l = rmNode.lock.acquireWrite()) { + // Switch the last heartbeatresponse. + NodeHealthStatus remoteNodeHealthStatus = statusEvent + .getNodeHealthStatus(); + rmNode.setHealthReportInternal(remoteNodeHealthStatus.getHealthReport()); + rmNode.setLastHealthReportTimeInternal(remoteNodeHealthStatus + .getLastHealthReportTime()); + rmNode.setAggregatedContainersUtilizationInternal(statusEvent + .getAggregatedContainersUtilization()); + rmNode.setNodeUtilizationInternal(statusEvent.getNodeUtilization()); + return remoteNodeHealthStatus; + } } public static class AddNodeTransition implements @@ -1287,7 +1218,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { @Override public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event; - // Switch the last heartbeatresponse. NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents( rmNode, statusEvent); @@ -1527,8 +1457,7 @@ private void handleLogAggregationStatus( @Override public List pullNewlyIncreasedContainers() { - writeLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { if (nmReportedIncreasedContainers.isEmpty()) { return Collections.emptyList(); } else { @@ -1537,9 +1466,6 @@ private void handleLogAggregationStatus( nmReportedIncreasedContainers.clear(); return container; } - - } finally { - writeLock.unlock(); } } @@ -1548,23 +1474,15 @@ public Resource getOriginalTotalCapability() { } public OpportunisticContainersStatus getOpportunisticContainersStatus() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.opportunisticContainersStatus; - } finally { - this.readLock.unlock(); } } public void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus) { - this.writeLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { this.opportunisticContainersStatus = opportunisticContainersStatus; - } finally { - this.writeLock.unlock(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index ef03aadf1a0..5f378b0cd28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.builder.CompareToBuilder; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -90,8 +91,12 @@ // Last updated time private volatile long lastHeartbeatMonotonicTime; + // autocloseable reentrant read write lock + protected AutoCloseableRWLock lock; + public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { + lock = new AutoCloseableRWLock(); this.rmNode = node; this.rmContext = node.getRMContext(); this.unallocatedResource = Resources.clone(node.getTotalCapability()); @@ -117,10 +122,12 @@ public RMNode getRMNode() { * Set total resources on the node. * @param resource Total resources on the node. */ - public synchronized void updateTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + public void updateTotalResource(Resource resource) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + this.totalResource = resource; + this.unallocatedResource = Resources.subtract(totalResource, + this.allocatedResource); + } } /** @@ -129,13 +136,15 @@ public synchronized void updateTotalResource(Resource resource){ * are not overcommitted anymore. This may reset a previous timeout. * @param timeOut Time out in milliseconds. */ - public synchronized void setOvercommitTimeOut(long timeOut) { - if (timeOut >= 0) { - if (this.overcommitTimeout != -1) { - LOG.debug("The overcommit timeout for {} was already set to {}", - getNodeID(), this.overcommitTimeout); + public void setOvercommitTimeOut(long timeOut) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + if (timeOut >= 0) { + if (this.overcommitTimeout != -1) { + LOG.debug("The overcommit timeout for {} was already set to {}", + getNodeID(), this.overcommitTimeout); + } + this.overcommitTimeout = Time.now() + timeOut; } - this.overcommitTimeout = Time.now() + timeOut; } } @@ -143,16 +152,21 @@ public synchronized void setOvercommitTimeOut(long timeOut) { * Check if the time out has passed. * @return If the node is overcommitted. */ - public synchronized boolean isOvercommitTimedOut() { - return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout; + public boolean isOvercommitTimedOut() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.overcommitTimeout >= 0 + && Time.now() >= this.overcommitTimeout; + } } /** * Check if the node has a time out for overcommit resources. * @return If the node has a time out for overcommit resources. */ - public synchronized boolean isOvercommitTimeOutSet() { - return this.overcommitTimeout >= 0; + public boolean isOvercommitTimeOutSet() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.overcommitTimeout >= 0; + } } /** @@ -208,11 +222,19 @@ public void allocateContainer(RMContainer rmContainer) { * @param rmContainer Allocated container * @param launchedOnNode True if the container has been launched */ - protected synchronized void allocateContainer(RMContainer rmContainer, + protected void allocateContainer(RMContainer rmContainer, + boolean launchedOnNode) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + allocateContainerInternal(rmContainer, launchedOnNode); + } + } + + protected void allocateContainerInternal(RMContainer rmContainer, boolean launchedOnNode) { + assert (lock.isExcLockedByCurrentThread()); Container container = rmContainer.getContainer(); if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { - deductUnallocatedResource(container.getResource()); + deductUnallocatedResourceInternal(container.getResource()); ++numContainers; } @@ -224,43 +246,49 @@ protected synchronized void allocateContainer(RMContainer rmContainer, * Get unallocated resources on the node. * @return Unallocated resources on the node */ - public synchronized Resource getUnallocatedResource() { - return this.unallocatedResource; + public Resource getUnallocatedResource() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.unallocatedResource; + } } /** * Get allocated resources on the node. * @return Allocated resources on the node */ - public synchronized Resource getAllocatedResource() { - return this.allocatedResource; + public Resource getAllocatedResource() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.allocatedResource; + } } /** * Get total resources on the node. * @return Total resources on the node. */ - public synchronized Resource getTotalResource() { - return this.totalResource; + public Resource getTotalResource() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.totalResource; + } } /** * Check if a container is launched by this node. * @return If the container is launched by the node. */ - public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; + public boolean isValidContainer(ContainerId containerId) { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return (launchedContainers.containsKey(containerId)); } - return false; } /** * Update the resources of the node when releasing a container. * @param container Container to release. */ - protected synchronized void updateResourceForReleasedContainer( + protected void updateResourceForReleasedContainer( Container container) { + assert (lock.isExcLockedByCurrentThread()); if (container.getExecutionType() == ExecutionType.GUARANTEED) { addUnallocatedResource(container.getResource()); --numContainers; @@ -272,19 +300,38 @@ protected synchronized void updateResourceForReleasedContainer( * @param containerId ID of container to be released. * @param releasedByNode whether the release originates from a node update. */ - public synchronized void releaseContainer(ContainerId containerId, + public void releaseContainer(ContainerId containerId, + boolean releasedByNode) { + Container container = null; + try (AutoCloseableRWLock l = lock.acquireWrite()) { + container = releaseContainerInternal(containerId, releasedByNode); + } + + if (LOG.isDebugEnabled() && container != null) { + LOG.debug("Released container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getAllocatedResource() + " used and " + getUnallocatedResource() + + " available" + ", release resources=" + true); + } + } + + protected Container releaseContainerInternal(ContainerId containerId, boolean releasedByNode) { + assert (lock.isExcLockedByCurrentThread()); + Container container = null; + ContainerInfo info = launchedContainers.get(containerId); if (info == null) { - return; + return null; } if (!releasedByNode && info.launchedOnNode) { // wait until node reports container has completed - return; + return null; } launchedContainers.remove(containerId); - Container container = info.container.getContainer(); + container = info.container.getContainer(); // We remove allocation tags when a container is actually // released on NM. This is to avoid running into situation @@ -299,23 +346,20 @@ public synchronized void releaseContainer(ContainerId containerId, updateResourceForReleasedContainer(container); - if (LOG.isDebugEnabled()) { - LOG.debug("Released container " + container.getId() + " of capacity " - + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which currently has " + numContainers + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available" + ", release resources=" + true); - } + return container; + } /** * Inform the node that a container has launched. * @param containerId ID of the launched container */ - public synchronized void containerStarted(ContainerId containerId) { - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { - info.launchedOnNode = true; + public void containerStarted(ContainerId containerId) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + ContainerInfo info = launchedContainers.get(containerId); + if (info != null) { + info.launchedOnNode = true; + } } } @@ -324,7 +368,8 @@ public synchronized void containerStarted(ContainerId containerId) { * container. * @param resource Resources to add. */ - private synchronized void addUnallocatedResource(Resource resource) { + private void addUnallocatedResource(Resource resource) { + assert (lock.isExcLockedByCurrentThread()); if (resource == null) { LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); @@ -340,7 +385,14 @@ private synchronized void addUnallocatedResource(Resource resource) { * @param resource Resources to deduct. */ @VisibleForTesting - public synchronized void deductUnallocatedResource(Resource resource) { + public void deductUnallocatedResource(Resource resource) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + deductUnallocatedResourceInternal(resource); + } + } + + protected void deductUnallocatedResourceInternal(Resource resource) { + assert(lock.isExcLockedByCurrentThread()); if (resource == null) { LOG.error("Invalid deduction of null resource for " + rmNode.getNodeAddress()); @@ -384,10 +436,15 @@ public int getNumContainers() { * Get the containers running on the node. * @return A copy of containers running on the node. */ - public synchronized List getCopiedListOfRunningContainers() { - List result = new ArrayList<>(launchedContainers.size()); - for (ContainerInfo info : launchedContainers.values()) { - result.add(info.container); + public List getCopiedListOfRunningContainers() { + List result = null; + try (AutoCloseableRWLock l = lock.acquireRead()) { + result = new ArrayList<>(launchedContainers.size()); + } + if (result != null) { + for (ContainerInfo info : launchedContainers.values()) { + result.add(info.container); + } } return result; } @@ -396,16 +453,18 @@ public int getNumContainers() { * Get the containers running on the node with AM containers at the end. * @return A copy of running containers with AM containers at the end. */ - public synchronized List getRunningContainersWithAMsAtTheEnd() { - LinkedList result = new LinkedList<>(); - for (ContainerInfo info : launchedContainers.values()) { - if(info.container.isAMContainer()) { - result.addLast(info.container); - } else { - result.addFirst(info.container); + public List getRunningContainersWithAMsAtTheEnd() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + LinkedList result = new LinkedList<>(); + for (ContainerInfo info : launchedContainers.values()) { + if (info.container.isAMContainer()) { + result.addLast(info.container); + } else { + result.addFirst(info.container); + } } + return result; } - return result; } /** @@ -430,12 +489,14 @@ public int getNumContainers() { * Get the launched containers in the node. * @return List of launched containers. */ - protected synchronized List getLaunchedContainers() { - List result = new ArrayList<>(); - for (ContainerInfo info : launchedContainers.values()) { - result.add(info.container); + protected List getLaunchedContainers() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + List result = new ArrayList<>(); + for (ContainerInfo info : launchedContainers.values()) { + result.add(info.container); + } + return result; } - return result; } /** @@ -443,37 +504,43 @@ public int getNumContainers() { * @param containerId The container ID * @return The container for the specified container ID */ - protected synchronized RMContainer getContainer(ContainerId containerId) { + protected RMContainer getContainer(ContainerId containerId) { + assert (lock.isLocked()); RMContainer container = null; ContainerInfo info = launchedContainers.get(containerId); if (info != null) { container = info.container; } return container; + } /** * Get the reserved container in the node. * @return Reserved container in the node. */ - public synchronized RMContainer getReservedContainer() { - return reservedContainer; + public RMContainer getReservedContainer() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return reservedContainer; + } } /** * Set the reserved container in the node. * @param reservedContainer Reserved container in the node. */ - public synchronized void + public void setReservedContainer(RMContainer reservedContainer) { - this.reservedContainer = reservedContainer; + try (AutoCloseableRWLock l = lock.acquireWrite()) { + this.reservedContainer = reservedContainer; + } } /** * Recover a container. * @param rmContainer Container to recover. */ - public synchronized void recoverContainer(RMContainer rmContainer) { + public void recoverContainer(RMContainer rmContainer) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 8bee0f8fe6a..a2feec4274b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -55,118 +56,135 @@ public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { } @Override - public synchronized void reserveResource( + public void reserveResource( SchedulerApplicationAttempt application, SchedulerRequestKey priority, RMContainer container) { - // Check if it's already reserved - RMContainer reservedContainer = getReservedContainer(); - if (reservedContainer != null) { - // Sanity check - if (!container.getContainer().getNodeId().equals(getNodeID())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " on node " + container.getReservedNode() + - " when currently" + " reserved resource " + reservedContainer + - " on node " + reservedContainer.getReservedNode()); - } - - // Cannot reserve more than one application attempt on a given node! - // Reservation is still against attempt. - if (!reservedContainer.getContainer().getId().getApplicationAttemptId() - .equals(container.getContainer().getId().getApplicationAttemptId())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " for application " + application.getApplicationAttemptId() + - " when currently" + - " reserved container " + reservedContainer + - " on node " + this); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Updated reserved container " - + container.getContainer().getId() + " on node " + this - + " for application attempt " - + application.getApplicationAttemptId()); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Reserved container " - + container.getContainer().getId() + " on node " + this - + " for application attempt " - + application.getApplicationAttemptId()); + try (AutoCloseableRWLock l = lock.acquireWrite()) { + // Check if it's already reserved + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { + // Sanity check + if (!container.getContainer().getNodeId().equals(getNodeID())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); + } + + // Cannot reserve more than one application attempt on a given node! + // Reservation is still against attempt. + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals( + container.getContainer().getId().getApplicationAttemptId())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " for application " + application.getApplicationAttemptId() + + " when currently" + + " reserved container " + reservedContainer + + " on node " + this); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Updated reserved container " + + container.getContainer().getId() + " on node " + this + + " for application attempt " + + application.getApplicationAttemptId()); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Reserved container " + + container.getContainer().getId() + " on node " + this + + " for application attempt " + + application.getApplicationAttemptId()); + } } + setReservedContainer(container); } - setReservedContainer(container); } @Override - public synchronized void unreserveResource( + public void unreserveResource( SchedulerApplicationAttempt application) { - // adding NP checks as this can now be called for preemption - if (getReservedContainer() != null - && getReservedContainer().getContainer() != null - && getReservedContainer().getContainer().getId() != null - && getReservedContainer().getContainer().getId() + try (AutoCloseableRWLock l = lock.acquireWrite()) { + // adding NP checks as this can now be called for preemption + if (getReservedContainer() != null + && getReservedContainer().getContainer() != null + && getReservedContainer().getContainer().getId() != null + && getReservedContainer().getContainer().getId() .getApplicationAttemptId() != null) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - getReservedContainer().getContainer().getId() - .getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationAttemptId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + getReservedContainer().getContainer().getId() + .getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationAttemptId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } } + setReservedContainer(null); } - setReservedContainer(null); } // According to decisions from preemption policy, mark the container to killable - public synchronized void markContainerToKillable(ContainerId containerId) { - RMContainer c = getContainer(containerId); - if (c != null && !killableContainers.containsKey(containerId)) { - killableContainers.put(containerId, c); - Resources.addTo(totalKillableResources, c.getAllocatedResource()); + public void markContainerToKillable(ContainerId containerId) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + RMContainer c = getContainer(containerId); + if (c != null && !killableContainers.containsKey(containerId)) { + killableContainers.put(containerId, c); + Resources.addTo(totalKillableResources, c.getAllocatedResource()); + } } } // According to decisions from preemption policy, mark the container to // non-killable - public synchronized void markContainerToNonKillable(ContainerId containerId) { - RMContainer c = getContainer(containerId); - if (c != null && killableContainers.containsKey(containerId)) { - killableContainers.remove(containerId); - Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); + public void markContainerToNonKillable(ContainerId containerId) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + RMContainer c = getContainer(containerId); + if (c != null && killableContainers.containsKey(containerId)) { + killableContainers.remove(containerId); + Resources + .subtractFrom(totalKillableResources, c.getAllocatedResource()); + } } } @Override - protected synchronized void updateResourceForReleasedContainer( - Container container) { - super.updateResourceForReleasedContainer(container); - if (killableContainers.containsKey(container.getId())) { - Resources.subtractFrom(totalKillableResources, container.getResource()); - killableContainers.remove(container.getId()); + protected void updateResourceForReleasedContainer(Container container) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + super.updateResourceForReleasedContainer(container); + if (killableContainers.containsKey(container.getId())) { + Resources.subtractFrom(totalKillableResources, container.getResource()); + killableContainers.remove(container.getId()); + } } } - public synchronized Resource getTotalKillableResources() { - return totalKillableResources; + public Resource getTotalKillableResources() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return totalKillableResources; + } } - public synchronized Map getKillableContainers() { - return Collections.unmodifiableMap(killableContainers); + public Map getKillableContainers() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return Collections.unmodifiableMap(killableContainers); + } } - protected synchronized void allocateContainer(RMContainer rmContainer, + protected void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { - super.allocateContainer(rmContainer, launchedOnNode); - - final Container container = rmContainer.getContainer(); + Container container = null; + try (AutoCloseableRWLock l = lock.acquireWrite()) { + super.allocateContainerInternal(rmContainer, launchedOnNode); + container = rmContainer.getContainer(); + } LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() + ", which has " + getNumContainers() + " containers, " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 8ae1e2a2f99..e6c61d57a3a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -85,66 +86,73 @@ Resource getTotalReserved() { } @Override - public synchronized void reserveResource( - SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey, - RMContainer container) { - // Check if it's already reserved - RMContainer reservedContainer = getReservedContainer(); - if (reservedContainer != null) { - // Sanity check - if (!container.getContainer().getNodeId().equals(getNodeID())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " on node " + container.getReservedNode() + - " when currently" + " reserved resource " + reservedContainer + - " on node " + reservedContainer.getReservedNode()); - } - - // Cannot reserve more than one application on a given node! - if (!reservedContainer.getContainer().getId().getApplicationAttemptId() - .equals(container.getContainer().getId().getApplicationAttemptId())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " for application " + application.getApplicationId() + - " when currently" + - " reserved container " + reservedContainer + - " on node " + this); - } + public void reserveResource(SchedulerApplicationAttempt application, + SchedulerRequestKey schedulerKey, + RMContainer container) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + // Check if it's already reserved + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { + // Sanity check + if (!container.getContainer().getNodeId().equals(getNodeID())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); + } + + // Cannot reserve more than one application on a given node! + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals( + container.getContainer().getId().getApplicationAttemptId())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " for application " + application.getApplicationId() + + " when currently" + + " reserved container " + reservedContainer + + " on node " + this); + } - LOG.info("Updated reserved container " + container.getContainer().getId() - + " on node " + this + " for application " - + application.getApplicationId()); - } else { - LOG.info("Reserved container " + container.getContainer().getId() - + " on node " + this + " for application " - + application.getApplicationId()); + LOG.info( + "Updated reserved container " + container.getContainer().getId() + + " on node " + this + " for application " + + application.getApplicationId()); + } else { + LOG.info("Reserved container " + container.getContainer().getId() + + " on node " + this + " for application " + + application.getApplicationId()); + } + setReservedContainer(container); + this.reservedAppSchedulable = (FSAppAttempt) application; } - setReservedContainer(container); - this.reservedAppSchedulable = (FSAppAttempt) application; } @Override - public synchronized void unreserveResource( - SchedulerApplicationAttempt application) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - getReservedContainer().getContainer().getId() - .getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); + public void unreserveResource(SchedulerApplicationAttempt application) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + getReservedContainer().getContainer().getId() + .getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } + + setReservedContainer(null); + this.reservedAppSchedulable = null; } - - setReservedContainer(null); - this.reservedAppSchedulable = null; } - synchronized FSAppAttempt getReservedAppSchedulable() { - return reservedAppSchedulable; + FSAppAttempt getReservedAppSchedulable() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return reservedAppSchedulable; + } } /** @@ -153,41 +161,42 @@ synchronized FSAppAttempt getReservedAppSchedulable() { * @return if any resources were allocated */ @VisibleForTesting - synchronized LinkedHashMap getPreemptionList() { - cleanupPreemptionList(); - return new LinkedHashMap<>(resourcesPreemptedForApp); + LinkedHashMap getPreemptionList() { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + cleanupPreemptionList(); + return new LinkedHashMap<>(resourcesPreemptedForApp); + } } /** * Returns whether a preemption is tracked on the node for the specified app. * @return if preempted containers are reserved for the app */ - synchronized boolean isPreemptedForApp(FSAppAttempt app){ - return resourcesPreemptedForApp.containsKey(app); + boolean isPreemptedForApp(FSAppAttempt app){ + try (AutoCloseableRWLock l = lock.acquireRead()) { + return resourcesPreemptedForApp.containsKey(app); + } } /** * Remove apps that have their preemption requests fulfilled. + * Write lock must be grabbed */ private void cleanupPreemptionList() { // Synchronize separately to avoid potential deadlocks // This may cause delayed deletion of reservations LinkedList candidates; - synchronized (this) { - candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet()); - } + candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet()); for (FSAppAttempt app : candidates) { if (app.isStopped() || !app.isStarved() || (Resources.isNone(app.getFairshareStarvation()) && Resources.isNone(app.getMinshareStarvation()))) { // App does not need more resources - synchronized (this) { - Resource removed = resourcesPreemptedForApp.remove(app); - if (removed != null) { - Resources.subtractFrom(totalResourcesPreempted, - removed); - appIdToAppMap.remove(app.getApplicationAttemptId()); - } + Resource removed = resourcesPreemptedForApp.remove(app); + if (removed != null) { + Resources.subtractFrom(totalResourcesPreempted, + removed); + appIdToAppMap.remove(app.getApplicationAttemptId()); } } } @@ -205,14 +214,13 @@ void addContainersForPreemption(Collection containers, FSAppAttempt app) { Resource appReserved = Resources.createResource(0); - - for(RMContainer container : containers) { - if(containersForPreemption.add(container)) { - Resources.addTo(appReserved, container.getAllocatedResource()); + try (AutoCloseableRWLock l = lock.acquireWrite()) { + for (RMContainer container : containers) { + if (containersForPreemption.add(container)) { + Resources.addTo(appReserved, container.getAllocatedResource()); + } } - } - synchronized (this) { if (!Resources.isNone(appReserved)) { Resources.addTo(totalResourcesPreempted, appReserved); @@ -221,6 +229,7 @@ void addContainersForPreemption(Collection containers, putIfAbsent(app, Resource.newInstance(0, 0)); Resources.addTo(resourcesPreemptedForApp.get(app), appReserved); } + } } @@ -238,36 +247,39 @@ void addContainersForPreemption(Collection containers, * @param launchedOnNode True if the container has been launched */ @Override - protected synchronized void allocateContainer(RMContainer rmContainer, - boolean launchedOnNode) { - super.allocateContainer(rmContainer, launchedOnNode); - if (LOG.isDebugEnabled()) { - final Container container = rmContainer.getContainer(); - LOG.debug("Assigned container " + container.getId() + " of capacity " - + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available after allocation"); - } + protected void allocateContainer(RMContainer rmContainer, + boolean launchedOnNode) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + super.allocateContainerInternal(rmContainer, launchedOnNode); + if (LOG.isDebugEnabled()) { + final Container container = rmContainer.getContainer(); + LOG.debug("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " on host " + getRMNode() + .getNodeAddress() + + ", which has " + getNumContainers() + " containers, " + + getAllocatedResource() + " used and " + getUnallocatedResource() + + " available after allocation"); + } - Resource allocated = rmContainer.getAllocatedResource(); - if (!Resources.isNone(allocated)) { - // check for satisfied preemption request and update bookkeeping - FSAppAttempt app = - appIdToAppMap.get(rmContainer.getApplicationAttemptId()); - if (app != null) { - Resource reserved = resourcesPreemptedForApp.get(app); - Resource fulfilled = Resources.componentwiseMin(reserved, allocated); - Resources.subtractFrom(reserved, fulfilled); - Resources.subtractFrom(totalResourcesPreempted, fulfilled); - if (Resources.isNone(reserved)) { - // No more preempted containers - resourcesPreemptedForApp.remove(app); - appIdToAppMap.remove(rmContainer.getApplicationAttemptId()); + Resource allocated = rmContainer.getAllocatedResource(); + if (!Resources.isNone(allocated)) { + // check for satisfied preemption request and update bookkeeping + FSAppAttempt app = + appIdToAppMap.get(rmContainer.getApplicationAttemptId()); + if (app != null) { + Resource reserved = resourcesPreemptedForApp.get(app); + Resource fulfilled = Resources.componentwiseMin(reserved, allocated); + Resources.subtractFrom(reserved, fulfilled); + Resources.subtractFrom(totalResourcesPreempted, fulfilled); + if (Resources.isNone(reserved)) { + // No more preempted containers + resourcesPreemptedForApp.remove(app); + appIdToAppMap.remove(rmContainer.getApplicationAttemptId()); + } } + } else { + LOG.error("Allocated empty container" + rmContainer.getContainerId()); } - } else { - LOG.error("Allocated empty container" + rmContainer.getContainerId()); } } @@ -279,12 +291,14 @@ protected synchronized void allocateContainer(RMContainer rmContainer, * @param releasedByNode whether the release originates from a node update. */ @Override - public synchronized void releaseContainer(ContainerId containerId, - boolean releasedByNode) { - RMContainer container = getContainer(containerId); - super.releaseContainer(containerId, releasedByNode); - if (container != null) { - containersForPreemption.remove(container); + public void releaseContainer(ContainerId containerId, + boolean releasedByNode) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + RMContainer container = getContainer(containerId); + super.releaseContainerInternal(containerId, releasedByNode); + if (container != null) { + containersForPreemption.remove(container); + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java index 72c6c6801b3..855845beb6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java @@ -16,6 +16,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation; +import java.util.HashMap; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.ReservationACL; @@ -220,6 +221,7 @@ private void loadQueue(String parentName, Element element, isLeaf = false; } } + builder.initReservationAclsifAbsent(queueName); // if a leaf in the alloc file is marked as type='parent' // then store it as a parent queue diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java index c2ee8c1ff1d..23d740a1fd4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java @@ -249,6 +249,11 @@ public Builder reservationAcls(String queueName, return this; } + public Builder initReservationAclsifAbsent(String queueName) { + this.reservationAcls.putIfAbsent(queueName, new HashMap<>()); + return this; + } + public Builder reservableQueues(String queue) { this.reservableQueues.add(queue); return this;