diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 8149ab470ff..c6c47ea8eac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -137,15 +138,22 @@ private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); - protected List newlyAllocatedContainers = new ArrayList<>(); - protected List tempContainerToKill = new ArrayList<>(); - protected Map newlyPromotedContainers = new HashMap<>(); - protected Map newlyDemotedContainers = new HashMap<>(); - protected Map newlyDecreasedContainers = new HashMap<>(); - protected Map newlyIncreasedContainers = new HashMap<>(); + protected List newlyAllocatedContainers = + new CopyOnWriteArrayList<>(); + protected List tempContainerToKill = + new CopyOnWriteArrayList<>(); + protected Map newlyPromotedContainers = + new ConcurrentHashMap<>(); + protected Map newlyDemotedContainers = + new ConcurrentHashMap<>(); + protected Map newlyDecreasedContainers = + new ConcurrentHashMap<>(); + protected Map newlyIncreasedContainers = + new ConcurrentHashMap<>(); protected Set updatedNMTokens = new HashSet<>(); - protected List updateContainerErrors = new ArrayList<>(); + protected List updateContainerErrors = + new CopyOnWriteArrayList<>(); //Keeps track of recovered containers from previous attempt which haven't //been reported to the AM. @@ -824,22 +832,36 @@ private void updateNMToken(Container container) { } } - public synchronized void addToNewlyDemotedContainers(ContainerId containerId, + public void addToNewlyDemotedContainers(ContainerId containerId, RMContainer rmContainer) { - newlyDemotedContainers.put(containerId, rmContainer); + writeLock.lock(); + try { + newlyDemotedContainers.put(containerId, rmContainer); + } finally { + writeLock.unlock(); + } } - public synchronized void addToNewlyDecreasedContainers( - ContainerId containerId, RMContainer rmContainer) { - newlyDecreasedContainers.put(containerId, rmContainer); + public void addToNewlyDecreasedContainers(ContainerId containerId, + RMContainer rmContainer) { + writeLock.lock(); + try { + newlyDecreasedContainers.put(containerId, rmContainer); + } finally { + writeLock.unlock(); + } } - protected synchronized void addToUpdateContainerErrors( - UpdateContainerError error) { - updateContainerErrors.add(error); + protected void addToUpdateContainerErrors(UpdateContainerError error) { + writeLock.lock(); + try { + updateContainerErrors.add(error); + } finally { + writeLock.unlock(); + } } - protected synchronized void addToNewlyAllocatedContainers( + protected void addToNewlyAllocatedContainers( SchedulerNode node, RMContainer rmContainer) { ContainerId matchedContainerId = getUpdateContext().matchContainerToOutstandingIncreaseReq( @@ -856,13 +878,14 @@ protected synchronized void addToNewlyAllocatedContainers( RMContainer existingContainer = getRMContainer(matchedContainerId); // If this container was already GUARANTEED, then it is an // increase, else its a promotion - if (existingContainer == null || - EnumSet.of(RMContainerState.COMPLETED, RMContainerState.KILLED, - RMContainerState.EXPIRED, RMContainerState.RELEASED).contains( - existingContainer.getState())) { + if (existingContainer == null || EnumSet + .of(RMContainerState.COMPLETED, RMContainerState.KILLED, + RMContainerState.EXPIRED, RMContainerState.RELEASED) + .contains(existingContainer.getState())) { tempContainerToKill.add(rmContainer); } else { - if (ExecutionType.GUARANTEED == existingContainer.getExecutionType()) { + if (ExecutionType.GUARANTEED == existingContainer + .getExecutionType()) { newlyIncreasedContainers.put(matchedContainerId, rmContainer); } else { newlyPromotedContainers.put(matchedContainerId, rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 71aa865ccfc..8b66be93087 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ConcurrentHashMultiset; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -101,8 +102,8 @@ private static final Logger LOG = LoggerFactory.getLogger(FiCaSchedulerApp.class); - private final Set containersToPreempt = - new HashSet(); + private final ConcurrentHashMultiset containersToPreempt = + ConcurrentHashMultiset.create(); private CapacityHeadroomProvider headroomProvider; @@ -792,39 +793,32 @@ public void markContainerForPreemption(ContainerId cont) { */ public Allocation getAllocation(ResourceCalculator resourceCalculator, Resource clusterResource, Resource minimumAllocation) { - writeLock.lock(); - try { - Set currentContPreemption = Collections.unmodifiableSet( - new HashSet(containersToPreempt)); - containersToPreempt.clear(); - Resource tot = Resource.newInstance(0, 0); - for (ContainerId c : currentContPreemption) { - Resources.addTo(tot, liveContainers.get(c).getContainer() - .getResource()); - } - int numCont = (int) Math.ceil( - Resources.divide(rc, clusterResource, tot, minimumAllocation)); - ResourceRequest rr = ResourceRequest.newBuilder() - .priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY) - .capability(minimumAllocation).numContainers(numCont).build(); - List previousAttemptContainers = - pullPreviousAttemptContainers(); - List newlyAllocatedContainers = pullNewlyAllocatedContainers(); - List newlyIncreasedContainers = pullNewlyIncreasedContainers(); - List newlyDecreasedContainers = pullNewlyDecreasedContainers(); - List newlyPromotedContainers = pullNewlyPromotedContainers(); - List newlyDemotedContainers = pullNewlyDemotedContainers(); - List updatedNMTokens = pullUpdatedNMTokens(); - Resource headroom = getHeadroom(); - setApplicationHeadroomForMetrics(headroom); - return new Allocation(newlyAllocatedContainers, headroom, null, - currentContPreemption, Collections.singletonList(rr), updatedNMTokens, - newlyIncreasedContainers, newlyDecreasedContainers, - newlyPromotedContainers, newlyDemotedContainers, - previousAttemptContainers); - } finally { - writeLock.unlock(); + Set currentContPreemption = Collections + .unmodifiableSet(new HashSet(containersToPreempt)); + containersToPreempt.clear(); + Resource tot = Resource.newInstance(0, 0); + for (ContainerId c : currentContPreemption) { + Resources.addTo(tot, liveContainers.get(c).getContainer().getResource()); } + int numCont = (int) Math + .ceil(Resources.divide(rc, clusterResource, tot, minimumAllocation)); + ResourceRequest rr = ResourceRequest.newBuilder() + .priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY) + .capability(minimumAllocation).numContainers(numCont).build(); + List previousAttemptContainers = pullPreviousAttemptContainers(); + List newlyAllocatedContainers = pullNewlyAllocatedContainers(); + List newlyIncreasedContainers = pullNewlyIncreasedContainers(); + List newlyDecreasedContainers = pullNewlyDecreasedContainers(); + List newlyPromotedContainers = pullNewlyPromotedContainers(); + List newlyDemotedContainers = pullNewlyDemotedContainers(); + List updatedNMTokens = pullUpdatedNMTokens(); + Resource headroom = getHeadroom(); + setApplicationHeadroomForMetrics(headroom); + return new Allocation(newlyAllocatedContainers, headroom, null, + currentContPreemption, Collections.singletonList(rr), updatedNMTokens, + newlyIncreasedContainers, newlyDecreasedContainers, + newlyPromotedContainers, newlyDemotedContainers, + previousAttemptContainers); } @VisibleForTesting