diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableRWLock.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableRWLock.java new file mode 100644 index 00000000000..d08d5f0d587 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableRWLock.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is a wrap class of a ReentrantLock readWrite lock. Extending AutoCloseable + * interface such that the users can use a try-with-resource syntax. + */ +public class AutoCloseableRWLock implements AutoCloseable { + private final ReentrantReadWriteLock lock; + + /** + * Creates an instance of {@code AutoCloseableRWLock}, initializes + * the underlying lock instance with a new {@code ReentrantReadWriteLock}. + */ + public AutoCloseableRWLock() { + this(new ReentrantReadWriteLock()); + } + + /** + * Wrap provided Lock instance. + * @param lock Lock instance to wrap in AutoCloseable API. + */ + public AutoCloseableRWLock(ReentrantReadWriteLock lock) { + this.lock = lock; + } + + /** + * A wrapper method that makes a call to {@code readlock()} of the underlying + * {@code ReentrantReadWriteLock} object. + * + * Acquire the lock it is not held by another thread, then sets + * lock held count to one, then returns immediately. + * + * If the current thread already holds the lock, increase the lock + * help count by one and returns immediately. + * + * If the lock is held by another thread, the current thread is + * suspended until the lock has been acquired by current thread. + * + * @return The {@code AutoCloseableRWLock} object itself. This is to + * support try-with-resource syntax. + */ + public AutoCloseableRWLock acquireRead() { + this.lock.readLock().lock(); + return this; + } + + /** + * A wrapper method that makes a call to {@code writelock()} of the underlying + * {@code ReentrantReadWriteLock} object. + * + * Acquire the lock it is not held by another thread, then sets + * lock held count to one, then returns immediately. + * + * If the current thread already holds the lock, increase the lock + * help count by one and returns immediately. + * + * If the lock is held by another thread, the current thread is + * suspended until the lock has been acquired by current thread. + * + * @return The {@code AutoCloseableRWLock} object itself. This is to + * support try-with-resource syntax. + */ + public AutoCloseableRWLock acquireWrite() { + this.lock.writeLock().lock(); + return this; + } + + /** + * A wrapper method that makes a call to {@code tryLock()} of + * the underlying {@code ReadLock} object. + * + * If the lock is not held by another thread, acquires the lock, set the + * hold count to one and returns {@code true}. + * + * If the current thread already holds the lock, the increment the hold + * count by one and returns {@code true}. + * + * If the lock is held by another thread then the method returns + * immediately with {@code false}. + * + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} otherwise. + */ + public boolean tryRLock() { + return this.lock.readLock().tryLock(); + } + + /** + * A wrapper method that makes a call to {@code tryLock()} of + * the underlying {@code WriteLock} object. + * + * If the lock is not held by another thread, acquires the lock, set the + * hold count to one and returns {@code true}. + * + * If the current thread already holds the lock, the increment the hold + * count by one and returns {@code true}. + * + * If the lock is held by another thread then the method returns + * immediately with {@code false}. + * + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} otherwise. + */ + public boolean tryWLock() { + return this.lock.writeLock().tryLock(); + } + + /** + * A wrapper method that makes a call to {@code unlock()} of the + * underlying {@code ReentrantReadWriteLock} object. + * + * Attempts to release the lock. + * + * If the current thread holds the lock, decrements the hold + * count. If the hold count reaches zero, the lock is released. + * + * If the current thread does not hold the lock, then + * {@link IllegalMonitorStateException} is thrown. + */ + public void release() { + // firs twe should release read locks to support ReentrantDowngrades + if (this.lock.getReadLockCount() > 0) { + this.lock.readLock().unlock(); + return; + } + if (this.lock.isWriteLocked()) { + this.lock.writeLock().unlock(); + } + } + + /** + * Attempts to release the lock by making a call to {@code release()}. + * + * This is to implement {@code close()} method from {@code AutoCloseable} + * interface. This allows users to user a try-with-resource syntax, where + * the lock can be automatically released. + */ + @Override + public void close() { + release(); + } + + /** + * A wrapper method that makes a call to {@code isLocked()} of + * the underlying {@code ReentrantLock} object. + * + * Queries if this lock is held by any thread. This method is + * designed for use in monitoring of the system state, + * not for synchronization control. + * + * @return {@code true} if any thread holds this lock and + * {@code false} otherwise + */ + @VisibleForTesting + public boolean isLocked() { + return (this.lock.isWriteLocked() || this.lock.getReadLockCount() > 0); + } + + public boolean isExcLockedByCurrentThread() { + return this.lock.isWriteLockedByCurrentThread(); + } +} diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestAutoCloseableRWLock.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestAutoCloseableRWLock.java new file mode 100644 index 00000000000..62f941a414a --- /dev/null +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestAutoCloseableRWLock.java @@ -0,0 +1,176 @@ +package org.apache.hadoop.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class TestAutoCloseableRWLock { + /** + * Test the basic lock and unlock operation. + */ + @Test + public void testLockAcquireRelease() { + AutoCloseableRWLock lock = new AutoCloseableRWLock(); + AutoCloseableRWLock newlock = lock.acquireWrite(); + // Ensure acquire the same lock object. + assertEquals(newlock, lock); + // Ensure it locked now. + assertTrue(lock.isLocked()); + lock.release(); + newlock = lock.acquireRead(); + // Ensure acquire the same lock object. + assertEquals(newlock, lock); + lock.release(); + // Ensure it is unlocked now. + assertFalse(lock.isLocked()); + } + + @Test + public void testDowngradeLock() throws Exception { + AutoCloseableRWLock lock = new AutoCloseableRWLock(); + AutoCloseableRWLock newlock = lock.acquireWrite(); + // Ensure acquire the same lock object. + assertEquals(newlock, lock); + // Ensure it locked now. + assertTrue(lock.isLocked()); + // can we downgrade before releasing lock? + boolean tryRead = newlock.tryRLock(); + assertTrue(tryRead); + // try to read from another thread should fail + Thread competingWriteThreadA = new Thread() { + @Override + public void run() { + assertTrue(lock.isLocked()); + assertFalse(lock.tryWLock()); + assertFalse(lock.tryRLock()); + } + }; + competingWriteThreadA.start(); + competingWriteThreadA.join(); + lock.release(); + // Ensure it is still locked now because of writeLocked. + assertTrue(lock.isLocked()); + // try to read from another thread should fail + Thread competingWriteThread = new Thread() { + @Override + public void run() { + assertTrue(lock.isLocked()); + assertFalse(lock.tryWLock()); + assertFalse(lock.tryRLock()); + } + }; + competingWriteThread.start(); + competingWriteThread.join(); + assertTrue(lock.isLocked()); + lock.release(); + assertFalse(lock.isLocked()); + } + + /** + * Test when write lock is acquired, no other thread can + * lock it. + * + * @throws Exception + */ + @Test + public void testWriteMultipleThread() throws Exception { + AutoCloseableRWLock lock = new AutoCloseableRWLock(); + lock.acquireWrite(); + assertTrue(lock.isLocked()); + Thread competingThread = new Thread() { + @Override + public void run() { + assertTrue(lock.isLocked()); + assertFalse(lock.tryWLock()); + assertFalse(lock.tryRLock()); + } + }; + competingThread.start(); + competingThread.join(); + assertTrue(lock.isLocked()); + lock.release(); + assertFalse(lock.isLocked()); + } + + /** + * Test when read lock is acquired, other thread can + * read lock it. + * + * @throws Exception + */ + @Test + public void testReadMultipleThread() throws Exception { + AutoCloseableRWLock lock = new AutoCloseableRWLock(); + lock.acquireRead(); + assertTrue(lock.isLocked()); + Thread competingThread = new Thread() { + @Override + public void run() { + assertTrue(lock.isLocked()); + assertFalse(lock.tryWLock()); + assertTrue(lock.tryRLock()); + lock.release(); + } + }; + competingThread.start(); + competingThread.join(); + assertTrue(lock.isLocked()); + lock.release(); + assertFalse(lock.isLocked()); + } + + /** + * Test the correctness under try-with-resource syntax. + * + * @throws Exception + */ + @Test + public void testWriteTryWithResourceSyntax() throws Exception { + AutoCloseableRWLock lock = new AutoCloseableRWLock(); + try(AutoCloseableRWLock localLock = lock.acquireWrite()) { + assertEquals(localLock, lock); + assertTrue(lock.isLocked()); + Thread competingThread = new Thread() { + @Override + public void run() { + assertTrue(lock.isLocked()); + assertFalse(lock.tryRLock()); + assertFalse(lock.tryWLock()); + } + }; + competingThread.start(); + competingThread.join(); + assertTrue(localLock.isLocked()); + } + assertFalse(lock.isLocked()); + } + + /** + * Test the correctness under try-with-resource syntax. + * + * @throws Exception + */ + @Test + public void testReadTryWithResourceSyntax() throws Exception { + AutoCloseableRWLock lock = new AutoCloseableRWLock(); + try(AutoCloseableRWLock localLock = lock.acquireRead()) { + assertEquals(localLock, lock); + assertTrue(lock.isLocked()); + Thread competingThread = new Thread() { + @Override + public void run() { + assertTrue(lock.isLocked()); + assertTrue(lock.tryRLock()); + assertFalse(lock.tryWLock()); + lock.release(); + } + }; + competingThread.start(); + competingThread.join(); + assertTrue(localLock.isLocked()); + } + assertFalse(lock.isLocked()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a565fe75656..fad0b039934 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -31,11 +31,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - import org.apache.commons.collections.keyvalue.DefaultMapEntry; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -110,9 +107,6 @@ private static final RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); - private final ReadLock readLock; - private final WriteLock writeLock; - private final ConcurrentLinkedQueue nodeUpdateQueue; private volatile boolean nextHeartBeat = true; @@ -134,7 +128,7 @@ private String nodeManagerVersion; private Integer decommissioningTimeout; - private long timeStamp; + private volatile long timeStamp; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; /* Resource utilization for the node. */ @@ -377,6 +371,9 @@ private final StateMachine stateMachine; + // autocloseable reentrant read write lock + private final AutoCloseableRWLock lock; + public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { @@ -387,6 +384,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion, Resource physResource) { + this.lock = new AutoCloseableRWLock(); this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -404,10 +402,6 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.latestNodeHeartBeatResponse.setResponseId(0); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - this.readLock = lock.readLock(); - this.writeLock = lock.writeLock(); - this.stateMachine = stateMachineFactory.make(this); this.nodeUpdateQueue = new ConcurrentLinkedQueue(); @@ -482,43 +476,27 @@ public Node getNode() { @Override public String getHealthReport() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.healthReport; - } finally { - this.readLock.unlock(); } } - - public void setHealthReport(String healthReport) { - this.writeLock.lock(); - try { - this.healthReport = healthReport; - } finally { - this.writeLock.unlock(); - } + public AutoCloseableRWLock getLock() { + return lock; } - - public void setLastHealthReportTime(long lastHealthReportTime) { - this.writeLock.lock(); - try { - this.lastHealthReportTime = lastHealthReportTime; - } finally { - this.writeLock.unlock(); - } + private void setHealthReportInternal(String hReport) { + this.healthReport = hReport; } + private void setLastHealthReportTimeInternal(long lastHReportTime) { + this.lastHealthReportTime = lastHReportTime; + } + @Override public long getLastHealthReportTime() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.lastHealthReportTime; - } finally { - this.readLock.unlock(); } } @@ -529,45 +507,25 @@ public String getNodeManagerVersion() { @Override public ResourceUtilization getAggregatedContainersUtilization() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.containersUtilization; - } finally { - this.readLock.unlock(); } } - public void setAggregatedContainersUtilization( + private void setAggregatedContainersUtilizationInternal( ResourceUtilization containersUtilization) { - this.writeLock.lock(); - - try { - this.containersUtilization = containersUtilization; - } finally { - this.writeLock.unlock(); - } + this.containersUtilization = containersUtilization; } @Override public ResourceUtilization getNodeUtilization() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.nodeUtilization; - } finally { - this.readLock.unlock(); } } - public void setNodeUtilization(ResourceUtilization nodeUtilization) { - this.writeLock.lock(); - - try { - this.nodeUtilization = nodeUtilization; - } finally { - this.writeLock.unlock(); - } + private void setNodeUtilizationInternal(ResourceUtilization nodeUtil) { + this.nodeUtilization = nodeUtil; } @Override @@ -581,55 +539,36 @@ public void setPhysicalResource(Resource physicalResource) { @Override public NodeState getState() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.stateMachine.getCurrentState(); - } finally { - this.readLock.unlock(); } } @Override public List getAppsToCleanup() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return new ArrayList(this.finishedApplications); - } finally { - this.readLock.unlock(); } - } @Override public List getRunningApps() { - this.readLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return new ArrayList(this.runningApplications); - } finally { - this.readLock.unlock(); } } @Override public List getContainersToCleanUp() { - - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return new ArrayList(this.containersToClean); - } finally { - this.readLock.unlock(); } - }; + } @Override public void setAndUpdateNodeHeartbeatResponse( NodeHeartbeatResponse response) { - this.writeLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); @@ -652,10 +591,8 @@ public void setAndUpdateNodeHeartbeatResponse( // Synchronously update the last response in rmNode with updated // responseId this.latestNodeHeartBeatResponse = response; - } finally { - this.writeLock.unlock(); } - }; + } @VisibleForTesting public Collection getToBeUpdatedContainers() { @@ -664,28 +601,21 @@ public void setAndUpdateNodeHeartbeatResponse( @Override public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { - this.readLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.latestNodeHeartBeatResponse; - } finally { - this.readLock.unlock(); } } @Override public void resetLastNodeHeartBeatResponse() { - this.writeLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { latestNodeHeartBeatResponse.setResponseId(0); - } finally { - this.writeLock.unlock(); } } public void handle(RMNodeEvent event) { LOG.debug("Processing {} of type {}", event.getNodeId(), event.getType()); - writeLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { NodeState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); @@ -699,10 +629,6 @@ public void handle(RMNodeEvent event) { + getState()); } } - - finally { - writeLock.unlock(); - } } private void updateMetricsForRejoinedNode(NodeState previousNodeState) { @@ -838,15 +764,19 @@ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, private static NodeHealthStatus updateRMNodeFromStatusEvents( RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) { - // Switch the last heartbeatresponse. - NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); - rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); - rmNode.setLastHealthReportTime(remoteNodeHealthStatus - .getLastHealthReportTime()); - rmNode.setAggregatedContainersUtilization(statusEvent - .getAggregatedContainersUtilization()); - rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); - return remoteNodeHealthStatus; + // acquire the lock to do a sequence of updates + try (AutoCloseableRWLock l = rmNode.lock.acquireWrite()) { + // Switch the last heartbeatresponse. + NodeHealthStatus remoteNodeHealthStatus = statusEvent + .getNodeHealthStatus(); + rmNode.setHealthReportInternal(remoteNodeHealthStatus.getHealthReport()); + rmNode.setLastHealthReportTimeInternal(remoteNodeHealthStatus + .getLastHealthReportTime()); + rmNode.setAggregatedContainersUtilizationInternal(statusEvent + .getAggregatedContainersUtilization()); + rmNode.setNodeUtilizationInternal(statusEvent.getNodeUtilization()); + return remoteNodeHealthStatus; + } } public static class AddNodeTransition implements @@ -1287,7 +1217,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { @Override public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event; - // Switch the last heartbeatresponse. NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents( rmNode, statusEvent); @@ -1527,8 +1456,7 @@ private void handleLogAggregationStatus( @Override public List pullNewlyIncreasedContainers() { - writeLock.lock(); - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { if (nmReportedIncreasedContainers.isEmpty()) { return Collections.emptyList(); } else { @@ -1537,9 +1465,6 @@ private void handleLogAggregationStatus( nmReportedIncreasedContainers.clear(); return container; } - - } finally { - writeLock.unlock(); } } @@ -1548,23 +1473,15 @@ public Resource getOriginalTotalCapability() { } public OpportunisticContainersStatus getOpportunisticContainersStatus() { - this.readLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireRead()) { return this.opportunisticContainersStatus; - } finally { - this.readLock.unlock(); } } public void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus) { - this.writeLock.lock(); - - try { + try (AutoCloseableRWLock l = lock.acquireWrite()) { this.opportunisticContainersStatus = opportunisticContainersStatus; - } finally { - this.writeLock.unlock(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index ef03aadf1a0..a820637893a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.builder.CompareToBuilder; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -90,8 +91,12 @@ // Last updated time private volatile long lastHeartbeatMonotonicTime; + // autocloseable reentrant read write lock + private AutoCloseableRWLock lock; + public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { + lock = new AutoCloseableRWLock(); this.rmNode = node; this.rmContext = node.getRMContext(); this.unallocatedResource = Resources.clone(node.getTotalCapability()); @@ -113,14 +118,27 @@ public RMNode getRMNode() { return this.rmNode; } + public AutoCloseableRWLock getLock() { + return lock; + } + + protected AutoCloseableRWLock acquireWriteLock() { + return lock.acquireWrite(); + } + + protected AutoCloseableRWLock acquireReadLock() { + return lock.acquireRead(); + } /** * Set total resources on the node. * @param resource Total resources on the node. */ - public synchronized void updateTotalResource(Resource resource){ - this.totalResource = resource; - this.unallocatedResource = Resources.subtract(totalResource, - this.allocatedResource); + public void updateTotalResource(Resource resource) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + this.totalResource = resource; + this.unallocatedResource = Resources.subtract(totalResource, + this.allocatedResource); + } } /** @@ -129,13 +147,15 @@ public synchronized void updateTotalResource(Resource resource){ * are not overcommitted anymore. This may reset a previous timeout. * @param timeOut Time out in milliseconds. */ - public synchronized void setOvercommitTimeOut(long timeOut) { - if (timeOut >= 0) { - if (this.overcommitTimeout != -1) { - LOG.debug("The overcommit timeout for {} was already set to {}", - getNodeID(), this.overcommitTimeout); + public void setOvercommitTimeOut(long timeOut) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + if (timeOut >= 0) { + if (this.overcommitTimeout != -1) { + LOG.debug("The overcommit timeout for {} was already set to {}", + getNodeID(), this.overcommitTimeout); + } + this.overcommitTimeout = Time.now() + timeOut; } - this.overcommitTimeout = Time.now() + timeOut; } } @@ -143,16 +163,21 @@ public synchronized void setOvercommitTimeOut(long timeOut) { * Check if the time out has passed. * @return If the node is overcommitted. */ - public synchronized boolean isOvercommitTimedOut() { - return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout; + public boolean isOvercommitTimedOut() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.overcommitTimeout >= 0 + && Time.now() >= this.overcommitTimeout; + } } /** * Check if the node has a time out for overcommit resources. * @return If the node has a time out for overcommit resources. */ - public synchronized boolean isOvercommitTimeOutSet() { - return this.overcommitTimeout >= 0; + public boolean isOvercommitTimeOutSet() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.overcommitTimeout >= 0; + } } /** @@ -208,11 +233,19 @@ public void allocateContainer(RMContainer rmContainer) { * @param rmContainer Allocated container * @param launchedOnNode True if the container has been launched */ - protected synchronized void allocateContainer(RMContainer rmContainer, + protected void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + allocateContainerInternal(rmContainer, launchedOnNode); + } + } + + protected void allocateContainerInternal(RMContainer rmContainer, + boolean launchedOnNode) { + assert (lock.isExcLockedByCurrentThread()); Container container = rmContainer.getContainer(); if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { - deductUnallocatedResource(container.getResource()); + deductUnallocatedResourceInternal(container.getResource()); ++numContainers; } @@ -224,43 +257,49 @@ protected synchronized void allocateContainer(RMContainer rmContainer, * Get unallocated resources on the node. * @return Unallocated resources on the node */ - public synchronized Resource getUnallocatedResource() { - return this.unallocatedResource; + public Resource getUnallocatedResource() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.unallocatedResource; + } } /** * Get allocated resources on the node. * @return Allocated resources on the node */ - public synchronized Resource getAllocatedResource() { - return this.allocatedResource; + public Resource getAllocatedResource() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.allocatedResource; + } } /** * Get total resources on the node. * @return Total resources on the node. */ - public synchronized Resource getTotalResource() { - return this.totalResource; + public Resource getTotalResource() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return this.totalResource; + } } /** * Check if a container is launched by this node. * @return If the container is launched by the node. */ - public synchronized boolean isValidContainer(ContainerId containerId) { - if (launchedContainers.containsKey(containerId)) { - return true; + public boolean isValidContainer(ContainerId containerId) { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return (launchedContainers.containsKey(containerId)); } - return false; } /** * Update the resources of the node when releasing a container. * @param container Container to release. */ - protected synchronized void updateResourceForReleasedContainer( + protected void updateResourceForReleasedContainer( Container container) { + assert (lock.isExcLockedByCurrentThread()); if (container.getExecutionType() == ExecutionType.GUARANTEED) { addUnallocatedResource(container.getResource()); --numContainers; @@ -272,19 +311,38 @@ protected synchronized void updateResourceForReleasedContainer( * @param containerId ID of container to be released. * @param releasedByNode whether the release originates from a node update. */ - public synchronized void releaseContainer(ContainerId containerId, + public void releaseContainer(ContainerId containerId, boolean releasedByNode) { + Container container = null; + try (AutoCloseableRWLock l = lock.acquireWrite()) { + container = releaseContainerInternal(containerId, releasedByNode); + } + + if (LOG.isDebugEnabled() && container != null) { + LOG.debug("Released container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getAllocatedResource() + " used and " + getUnallocatedResource() + + " available" + ", release resources=" + true); + } + } + + protected Container releaseContainerInternal(ContainerId containerId, + boolean releasedByNode) { + assert (lock.isExcLockedByCurrentThread()); + Container container = null; + ContainerInfo info = launchedContainers.get(containerId); if (info == null) { - return; + return null; } if (!releasedByNode && info.launchedOnNode) { // wait until node reports container has completed - return; + return null; } launchedContainers.remove(containerId); - Container container = info.container.getContainer(); + container = info.container.getContainer(); // We remove allocation tags when a container is actually // released on NM. This is to avoid running into situation @@ -299,23 +357,20 @@ public synchronized void releaseContainer(ContainerId containerId, updateResourceForReleasedContainer(container); - if (LOG.isDebugEnabled()) { - LOG.debug("Released container " + container.getId() + " of capacity " - + container.getResource() + " on host " + rmNode.getNodeAddress() - + ", which currently has " + numContainers + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available" + ", release resources=" + true); - } + return container; + } /** * Inform the node that a container has launched. * @param containerId ID of the launched container */ - public synchronized void containerStarted(ContainerId containerId) { - ContainerInfo info = launchedContainers.get(containerId); - if (info != null) { - info.launchedOnNode = true; + public void containerStarted(ContainerId containerId) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + ContainerInfo info = launchedContainers.get(containerId); + if (info != null) { + info.launchedOnNode = true; + } } } @@ -324,7 +379,8 @@ public synchronized void containerStarted(ContainerId containerId) { * container. * @param resource Resources to add. */ - private synchronized void addUnallocatedResource(Resource resource) { + private void addUnallocatedResource(Resource resource) { + assert (lock.isExcLockedByCurrentThread()); if (resource == null) { LOG.error("Invalid resource addition of null resource for " + rmNode.getNodeAddress()); @@ -340,7 +396,14 @@ private synchronized void addUnallocatedResource(Resource resource) { * @param resource Resources to deduct. */ @VisibleForTesting - public synchronized void deductUnallocatedResource(Resource resource) { + public void deductUnallocatedResource(Resource resource) { + try (AutoCloseableRWLock l = lock.acquireWrite()) { + deductUnallocatedResourceInternal(resource); + } + } + + protected void deductUnallocatedResourceInternal(Resource resource) { + assert(lock.isExcLockedByCurrentThread()); if (resource == null) { LOG.error("Invalid deduction of null resource for " + rmNode.getNodeAddress()); @@ -384,10 +447,15 @@ public int getNumContainers() { * Get the containers running on the node. * @return A copy of containers running on the node. */ - public synchronized List getCopiedListOfRunningContainers() { - List result = new ArrayList<>(launchedContainers.size()); - for (ContainerInfo info : launchedContainers.values()) { - result.add(info.container); + public List getCopiedListOfRunningContainers() { + List result = null; + try (AutoCloseableRWLock l = lock.acquireRead()) { + result = new ArrayList<>(launchedContainers.size()); + } + if (result != null) { + for (ContainerInfo info : launchedContainers.values()) { + result.add(info.container); + } } return result; } @@ -396,16 +464,18 @@ public int getNumContainers() { * Get the containers running on the node with AM containers at the end. * @return A copy of running containers with AM containers at the end. */ - public synchronized List getRunningContainersWithAMsAtTheEnd() { - LinkedList result = new LinkedList<>(); - for (ContainerInfo info : launchedContainers.values()) { - if(info.container.isAMContainer()) { - result.addLast(info.container); - } else { - result.addFirst(info.container); + public List getRunningContainersWithAMsAtTheEnd() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + LinkedList result = new LinkedList<>(); + for (ContainerInfo info : launchedContainers.values()) { + if (info.container.isAMContainer()) { + result.addLast(info.container); + } else { + result.addFirst(info.container); + } } + return result; } - return result; } /** @@ -430,12 +500,14 @@ public int getNumContainers() { * Get the launched containers in the node. * @return List of launched containers. */ - protected synchronized List getLaunchedContainers() { - List result = new ArrayList<>(); - for (ContainerInfo info : launchedContainers.values()) { - result.add(info.container); + protected List getLaunchedContainers() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + List result = new ArrayList<>(); + for (ContainerInfo info : launchedContainers.values()) { + result.add(info.container); + } + return result; } - return result; } /** @@ -443,37 +515,43 @@ public int getNumContainers() { * @param containerId The container ID * @return The container for the specified container ID */ - protected synchronized RMContainer getContainer(ContainerId containerId) { + protected RMContainer getContainer(ContainerId containerId) { + assert (lock.isLocked()); RMContainer container = null; ContainerInfo info = launchedContainers.get(containerId); if (info != null) { container = info.container; } return container; + } /** * Get the reserved container in the node. * @return Reserved container in the node. */ - public synchronized RMContainer getReservedContainer() { - return reservedContainer; + public RMContainer getReservedContainer() { + try (AutoCloseableRWLock l = lock.acquireRead()) { + return reservedContainer; + } } /** * Set the reserved container in the node. * @param reservedContainer Reserved container in the node. */ - public synchronized void + public void setReservedContainer(RMContainer reservedContainer) { - this.reservedContainer = reservedContainer; + try (AutoCloseableRWLock l = lock.acquireWrite()) { + this.reservedContainer = reservedContainer; + } } /** * Recover a container. * @param rmContainer Container to recover. */ - public synchronized void recoverContainer(RMContainer rmContainer) { + public void recoverContainer(RMContainer rmContainer) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 8bee0f8fe6a..237cbe6473c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -55,118 +56,135 @@ public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) { } @Override - public synchronized void reserveResource( + public void reserveResource( SchedulerApplicationAttempt application, SchedulerRequestKey priority, RMContainer container) { - // Check if it's already reserved - RMContainer reservedContainer = getReservedContainer(); - if (reservedContainer != null) { - // Sanity check - if (!container.getContainer().getNodeId().equals(getNodeID())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " on node " + container.getReservedNode() + - " when currently" + " reserved resource " + reservedContainer + - " on node " + reservedContainer.getReservedNode()); - } - - // Cannot reserve more than one application attempt on a given node! - // Reservation is still against attempt. - if (!reservedContainer.getContainer().getId().getApplicationAttemptId() - .equals(container.getContainer().getId().getApplicationAttemptId())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " for application " + application.getApplicationAttemptId() + - " when currently" + - " reserved container " + reservedContainer + - " on node " + this); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Updated reserved container " - + container.getContainer().getId() + " on node " + this - + " for application attempt " - + application.getApplicationAttemptId()); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Reserved container " - + container.getContainer().getId() + " on node " + this - + " for application attempt " - + application.getApplicationAttemptId()); + try (AutoCloseableRWLock l = acquireWriteLock()) { + // Check if it's already reserved + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { + // Sanity check + if (!container.getContainer().getNodeId().equals(getNodeID())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); + } + + // Cannot reserve more than one application attempt on a given node! + // Reservation is still against attempt. + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals( + container.getContainer().getId().getApplicationAttemptId())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " for application " + application.getApplicationAttemptId() + + " when currently" + + " reserved container " + reservedContainer + + " on node " + this); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Updated reserved container " + + container.getContainer().getId() + " on node " + this + + " for application attempt " + + application.getApplicationAttemptId()); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Reserved container " + + container.getContainer().getId() + " on node " + this + + " for application attempt " + + application.getApplicationAttemptId()); + } } + setReservedContainer(container); } - setReservedContainer(container); } @Override - public synchronized void unreserveResource( + public void unreserveResource( SchedulerApplicationAttempt application) { - // adding NP checks as this can now be called for preemption - if (getReservedContainer() != null - && getReservedContainer().getContainer() != null - && getReservedContainer().getContainer().getId() != null - && getReservedContainer().getContainer().getId() + try (AutoCloseableRWLock l = acquireWriteLock()) { + // adding NP checks as this can now be called for preemption + if (getReservedContainer() != null + && getReservedContainer().getContainer() != null + && getReservedContainer().getContainer().getId() != null + && getReservedContainer().getContainer().getId() .getApplicationAttemptId() != null) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - getReservedContainer().getContainer().getId() - .getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationAttemptId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + getReservedContainer().getContainer().getId() + .getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationAttemptId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } } + setReservedContainer(null); } - setReservedContainer(null); } // According to decisions from preemption policy, mark the container to killable - public synchronized void markContainerToKillable(ContainerId containerId) { - RMContainer c = getContainer(containerId); - if (c != null && !killableContainers.containsKey(containerId)) { - killableContainers.put(containerId, c); - Resources.addTo(totalKillableResources, c.getAllocatedResource()); + public void markContainerToKillable(ContainerId containerId) { + try (AutoCloseableRWLock l = acquireWriteLock()) { + RMContainer c = getContainer(containerId); + if (c != null && !killableContainers.containsKey(containerId)) { + killableContainers.put(containerId, c); + Resources.addTo(totalKillableResources, c.getAllocatedResource()); + } } } // According to decisions from preemption policy, mark the container to // non-killable - public synchronized void markContainerToNonKillable(ContainerId containerId) { - RMContainer c = getContainer(containerId); - if (c != null && killableContainers.containsKey(containerId)) { - killableContainers.remove(containerId); - Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); + public void markContainerToNonKillable(ContainerId containerId) { + try (AutoCloseableRWLock l = acquireWriteLock()) { + RMContainer c = getContainer(containerId); + if (c != null && killableContainers.containsKey(containerId)) { + killableContainers.remove(containerId); + Resources + .subtractFrom(totalKillableResources, c.getAllocatedResource()); + } } } @Override - protected synchronized void updateResourceForReleasedContainer( - Container container) { - super.updateResourceForReleasedContainer(container); - if (killableContainers.containsKey(container.getId())) { - Resources.subtractFrom(totalKillableResources, container.getResource()); - killableContainers.remove(container.getId()); + protected void updateResourceForReleasedContainer(Container container) { + try (AutoCloseableRWLock l = acquireWriteLock()) { + super.updateResourceForReleasedContainer(container); + if (killableContainers.containsKey(container.getId())) { + Resources.subtractFrom(totalKillableResources, container.getResource()); + killableContainers.remove(container.getId()); + } } } - public synchronized Resource getTotalKillableResources() { - return totalKillableResources; + public Resource getTotalKillableResources() { + try (AutoCloseableRWLock l = acquireReadLock()) { + return totalKillableResources; + } } - public synchronized Map getKillableContainers() { - return Collections.unmodifiableMap(killableContainers); + public Map getKillableContainers() { + try (AutoCloseableRWLock l = acquireReadLock()) { + return Collections.unmodifiableMap(killableContainers); + } } - protected synchronized void allocateContainer(RMContainer rmContainer, + protected void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { - super.allocateContainer(rmContainer, launchedOnNode); - - final Container container = rmContainer.getContainer(); + Container container = null; + try (AutoCloseableRWLock l = acquireWriteLock()) { + super.allocateContainerInternal(rmContainer, launchedOnNode); + container = rmContainer.getContainer(); + } LOG.info("Assigned container " + container.getId() + " of capacity " + container.getResource() + " on host " + getRMNode().getNodeAddress() + ", which has " + getNumContainers() + " containers, " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 8ae1e2a2f99..55e1918f473 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.hadoop.util.AutoCloseableRWLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -85,66 +86,73 @@ Resource getTotalReserved() { } @Override - public synchronized void reserveResource( - SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey, - RMContainer container) { - // Check if it's already reserved - RMContainer reservedContainer = getReservedContainer(); - if (reservedContainer != null) { - // Sanity check - if (!container.getContainer().getNodeId().equals(getNodeID())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " on node " + container.getReservedNode() + - " when currently" + " reserved resource " + reservedContainer + - " on node " + reservedContainer.getReservedNode()); - } - - // Cannot reserve more than one application on a given node! - if (!reservedContainer.getContainer().getId().getApplicationAttemptId() - .equals(container.getContainer().getId().getApplicationAttemptId())) { - throw new IllegalStateException("Trying to reserve" + - " container " + container + - " for application " + application.getApplicationId() + - " when currently" + - " reserved container " + reservedContainer + - " on node " + this); - } + public void reserveResource(SchedulerApplicationAttempt application, + SchedulerRequestKey schedulerKey, + RMContainer container) { + try (AutoCloseableRWLock l = getLock().acquireWrite()) { + // Check if it's already reserved + RMContainer reservedContainer = getReservedContainer(); + if (reservedContainer != null) { + // Sanity check + if (!container.getContainer().getNodeId().equals(getNodeID())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " on node " + container.getReservedNode() + + " when currently" + " reserved resource " + reservedContainer + + " on node " + reservedContainer.getReservedNode()); + } + + // Cannot reserve more than one application on a given node! + if (!reservedContainer.getContainer().getId().getApplicationAttemptId() + .equals( + container.getContainer().getId().getApplicationAttemptId())) { + throw new IllegalStateException("Trying to reserve" + + " container " + container + + " for application " + application.getApplicationId() + + " when currently" + + " reserved container " + reservedContainer + + " on node " + this); + } - LOG.info("Updated reserved container " + container.getContainer().getId() - + " on node " + this + " for application " - + application.getApplicationId()); - } else { - LOG.info("Reserved container " + container.getContainer().getId() - + " on node " + this + " for application " - + application.getApplicationId()); + LOG.info( + "Updated reserved container " + container.getContainer().getId() + + " on node " + this + " for application " + + application.getApplicationId()); + } else { + LOG.info("Reserved container " + container.getContainer().getId() + + " on node " + this + " for application " + + application.getApplicationId()); + } + setReservedContainer(container); + this.reservedAppSchedulable = (FSAppAttempt) application; } - setReservedContainer(container); - this.reservedAppSchedulable = (FSAppAttempt) application; } @Override - public synchronized void unreserveResource( - SchedulerApplicationAttempt application) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - getReservedContainer().getContainer().getId() - .getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); + public void unreserveResource(SchedulerApplicationAttempt application) { + try (AutoCloseableRWLock l = acquireWriteLock()) { + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + getReservedContainer().getContainer().getId() + .getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } + + setReservedContainer(null); + this.reservedAppSchedulable = null; } - - setReservedContainer(null); - this.reservedAppSchedulable = null; } - synchronized FSAppAttempt getReservedAppSchedulable() { - return reservedAppSchedulable; + FSAppAttempt getReservedAppSchedulable() { + try (AutoCloseableRWLock l = getLock().acquireRead()) { + return reservedAppSchedulable; + } } /** @@ -153,41 +161,42 @@ synchronized FSAppAttempt getReservedAppSchedulable() { * @return if any resources were allocated */ @VisibleForTesting - synchronized LinkedHashMap getPreemptionList() { - cleanupPreemptionList(); - return new LinkedHashMap<>(resourcesPreemptedForApp); + LinkedHashMap getPreemptionList() { + try (AutoCloseableRWLock l = getLock().acquireWrite()) { + cleanupPreemptionList(); + return new LinkedHashMap<>(resourcesPreemptedForApp); + } } /** * Returns whether a preemption is tracked on the node for the specified app. * @return if preempted containers are reserved for the app */ - synchronized boolean isPreemptedForApp(FSAppAttempt app){ - return resourcesPreemptedForApp.containsKey(app); + boolean isPreemptedForApp(FSAppAttempt app){ + try (AutoCloseableRWLock l = getLock().acquireRead()) { + return resourcesPreemptedForApp.containsKey(app); + } } /** * Remove apps that have their preemption requests fulfilled. + * Write lock must be grabbed */ private void cleanupPreemptionList() { // Synchronize separately to avoid potential deadlocks // This may cause delayed deletion of reservations LinkedList candidates; - synchronized (this) { - candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet()); - } + candidates = Lists.newLinkedList(resourcesPreemptedForApp.keySet()); for (FSAppAttempt app : candidates) { if (app.isStopped() || !app.isStarved() || (Resources.isNone(app.getFairshareStarvation()) && Resources.isNone(app.getMinshareStarvation()))) { // App does not need more resources - synchronized (this) { - Resource removed = resourcesPreemptedForApp.remove(app); - if (removed != null) { - Resources.subtractFrom(totalResourcesPreempted, - removed); - appIdToAppMap.remove(app.getApplicationAttemptId()); - } + Resource removed = resourcesPreemptedForApp.remove(app); + if (removed != null) { + Resources.subtractFrom(totalResourcesPreempted, + removed); + appIdToAppMap.remove(app.getApplicationAttemptId()); } } } @@ -205,14 +214,13 @@ void addContainersForPreemption(Collection containers, FSAppAttempt app) { Resource appReserved = Resources.createResource(0); - - for(RMContainer container : containers) { - if(containersForPreemption.add(container)) { - Resources.addTo(appReserved, container.getAllocatedResource()); + try (AutoCloseableRWLock l = acquireWriteLock()) { + for (RMContainer container : containers) { + if (containersForPreemption.add(container)) { + Resources.addTo(appReserved, container.getAllocatedResource()); + } } - } - synchronized (this) { if (!Resources.isNone(appReserved)) { Resources.addTo(totalResourcesPreempted, appReserved); @@ -221,6 +229,7 @@ void addContainersForPreemption(Collection containers, putIfAbsent(app, Resource.newInstance(0, 0)); Resources.addTo(resourcesPreemptedForApp.get(app), appReserved); } + } } @@ -238,36 +247,39 @@ void addContainersForPreemption(Collection containers, * @param launchedOnNode True if the container has been launched */ @Override - protected synchronized void allocateContainer(RMContainer rmContainer, - boolean launchedOnNode) { - super.allocateContainer(rmContainer, launchedOnNode); - if (LOG.isDebugEnabled()) { - final Container container = rmContainer.getContainer(); - LOG.debug("Assigned container " + container.getId() + " of capacity " - + container.getResource() + " on host " + getRMNode().getNodeAddress() - + ", which has " + getNumContainers() + " containers, " - + getAllocatedResource() + " used and " + getUnallocatedResource() - + " available after allocation"); - } + protected void allocateContainer(RMContainer rmContainer, + boolean launchedOnNode) { + try (AutoCloseableRWLock l = acquireWriteLock()) { + super.allocateContainerInternal(rmContainer, launchedOnNode); + if (LOG.isDebugEnabled()) { + final Container container = rmContainer.getContainer(); + LOG.debug("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " on host " + getRMNode() + .getNodeAddress() + + ", which has " + getNumContainers() + " containers, " + + getAllocatedResource() + " used and " + getUnallocatedResource() + + " available after allocation"); + } - Resource allocated = rmContainer.getAllocatedResource(); - if (!Resources.isNone(allocated)) { - // check for satisfied preemption request and update bookkeeping - FSAppAttempt app = - appIdToAppMap.get(rmContainer.getApplicationAttemptId()); - if (app != null) { - Resource reserved = resourcesPreemptedForApp.get(app); - Resource fulfilled = Resources.componentwiseMin(reserved, allocated); - Resources.subtractFrom(reserved, fulfilled); - Resources.subtractFrom(totalResourcesPreempted, fulfilled); - if (Resources.isNone(reserved)) { - // No more preempted containers - resourcesPreemptedForApp.remove(app); - appIdToAppMap.remove(rmContainer.getApplicationAttemptId()); + Resource allocated = rmContainer.getAllocatedResource(); + if (!Resources.isNone(allocated)) { + // check for satisfied preemption request and update bookkeeping + FSAppAttempt app = + appIdToAppMap.get(rmContainer.getApplicationAttemptId()); + if (app != null) { + Resource reserved = resourcesPreemptedForApp.get(app); + Resource fulfilled = Resources.componentwiseMin(reserved, allocated); + Resources.subtractFrom(reserved, fulfilled); + Resources.subtractFrom(totalResourcesPreempted, fulfilled); + if (Resources.isNone(reserved)) { + // No more preempted containers + resourcesPreemptedForApp.remove(app); + appIdToAppMap.remove(rmContainer.getApplicationAttemptId()); + } } + } else { + LOG.error("Allocated empty container" + rmContainer.getContainerId()); } - } else { - LOG.error("Allocated empty container" + rmContainer.getContainerId()); } } @@ -279,12 +291,14 @@ protected synchronized void allocateContainer(RMContainer rmContainer, * @param releasedByNode whether the release originates from a node update. */ @Override - public synchronized void releaseContainer(ContainerId containerId, - boolean releasedByNode) { - RMContainer container = getContainer(containerId); - super.releaseContainer(containerId, releasedByNode); - if (container != null) { - containersForPreemption.remove(container); + public void releaseContainer(ContainerId containerId, + boolean releasedByNode) { + try (AutoCloseableRWLock l = acquireWriteLock()) { + RMContainer container = getContainer(containerId); + super.releaseContainerInternal(containerId, releasedByNode); + if (container != null) { + containersForPreemption.remove(container); + } } } }