diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index b7bb48e..211fe3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -642,8 +642,8 @@ public void run() { private class TimerMonitorTask extends TimerTask { @Override public void run() { + timerTasksMonitorWriteLock.lock(); try { - timerTasksMonitorWriteLock.lock(); monitorTimerTasks(); } finally { timerTasksMonitorWriteLock.unlock(); @@ -926,8 +926,8 @@ private void createAndStartTimerTasks() { } private void checkAndStartTimeTasks() { + this.timerTasksMonitorReadLock.lock(); try { - this.timerTasksMonitorReadLock.lock(); this.timeStampOfLastWrite = Time.monotonicNow(); if(!timerTaskStarted) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index 66e945f..f159752 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -767,8 +767,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) @SuppressWarnings("unchecked") private Map> generateNodeLabelsInfoPerNode(Class type) { + readLock.lock(); try { - readLock.lock(); Map> nodeToLabels = new HashMap<>(); for (Entry entry : nodeCollections.entrySet()) { String hostName = entry.getKey(); @@ -810,8 +810,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return set of nodes with no labels */ public Set getNodesWithoutALabel() { + readLock.lock(); try { - readLock.lock(); Set nodes = new HashSet<>(); for (Host host : nodeCollections.values()) { for (NodeId nodeId : host.nms.keySet()) { @@ -833,8 +833,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return labels to nodes map */ public Map> getLabelsToNodes() { + readLock.lock(); try { - readLock.lock(); return getLabelsToNodes(labelCollections.keySet()); } finally { readLock.unlock(); @@ -849,8 +849,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return labels to nodes map */ public Map> getLabelsToNodes(Set labels) { + readLock.lock(); try { - readLock.lock(); Map> labelsToNodes = getLabelsToNodesMapping(labels, String.class); return Collections.unmodifiableMap(labelsToNodes); @@ -866,8 +866,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return labels to nodes map */ public Map> getLabelsInfoToNodes() { + readLock.lock(); try { - readLock.lock(); return getLabelsInfoToNodes(labelCollections.keySet()); } finally { readLock.unlock(); @@ -923,8 +923,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return existing valid labels in repository */ public Set getClusterNodeLabelNames() { + readLock.lock(); try { - readLock.lock(); Set labels = new HashSet(labelCollections.keySet()); labels.remove(NO_LABEL); return Collections.unmodifiableSet(labels); @@ -934,8 +934,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) } public List getClusterNodeLabels() { + readLock.lock(); try { - readLock.lock(); List nodeLabels = new ArrayList<>(); for (RMNodeLabel label : labelCollections.values()) { if (!label.getLabelName().equals(NO_LABEL)) { @@ -953,8 +953,8 @@ public boolean isExclusiveNodeLabel(String nodeLabel) throws IOException { if (nodeLabel.equals(NO_LABEL)) { return noNodeLabel.getIsExclusive(); } + readLock.lock(); try { - readLock.lock(); RMNodeLabel label = labelCollections.get(nodeLabel); if (label == null) { String message = @@ -1065,8 +1065,8 @@ protected Node getNMInNodeSet(NodeId nodeId, Map map, } public Set getLabelsInfoByNode(NodeId nodeId) { + readLock.lock(); try { - readLock.lock(); Set labels = getLabelsByNode(nodeId, nodeCollections); if (labels.isEmpty()) { return EMPTY_NODELABEL_SET; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java index 989f027..3c0df2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java @@ -92,10 +92,10 @@ public void removeClusterNodeLabels(Collection labels) private void writeNewMirror() throws IOException { ReentrantReadWriteLock.ReadLock readLock = mgr.readLock; + readLock.lock(); try { // Acquire readlock to make sure we get cluster node labels and // node-to-labels mapping atomically. - readLock.lock(); List nodeLabels = mgr.getClusterNodeLabels(); Map> nodeToLabels = mgr.getNodeLabels(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java index 36c5214..615ecb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java @@ -57,8 +57,8 @@ public void init(Configuration conf) { @Override public void setPermission(List permissions, UserGroupInformation user) { + writeLock.lock(); try { - writeLock.lock(); for (Permission perm : permissions) { allAcls.put(perm.getTarget(), perm.getAcls()); } @@ -94,8 +94,8 @@ private boolean checkPermissionInternal(AccessType accessType, @Override public boolean checkPermission(AccessRequest accessRequest) { + readLock.lock(); try { - readLock.lock(); return checkPermissionInternal(accessRequest.getAccessType(), accessRequest.getEntity(), accessRequest.getUser()); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 0581878..5448740 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -482,8 +482,8 @@ protected void logOutput(String output) { * @return the path of the pid-file for the given containerId. */ protected Path getPidFilePath(ContainerId containerId) { + readLock.lock(); try { - readLock.lock(); return (this.pidFiles.get(containerId)); } finally { readLock.unlock(); @@ -633,9 +633,8 @@ protected Path getPidFilePath(ContainerId containerId) { * @return true if the container is active */ protected boolean isContainerActive(ContainerId containerId) { + readLock.lock(); try { - readLock.lock(); - return (this.pidFiles.containsKey(containerId)); } finally { readLock.unlock(); @@ -650,8 +649,8 @@ protected boolean isContainerActive(ContainerId containerId) { * of the launched process */ public void activateContainer(ContainerId containerId, Path pidFilePath) { + writeLock.lock(); try { - writeLock.lock(); this.pidFiles.put(containerId, pidFilePath); } finally { writeLock.unlock(); @@ -685,8 +684,8 @@ public void activateContainer(ContainerId containerId, Path pidFilePath) { * @param containerId the container ID */ public void deactivateContainer(ContainerId containerId) { + writeLock.lock(); try { - writeLock.lock(); this.pidFiles.remove(containerId); } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 80863a1..a314b1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -632,8 +632,8 @@ public String toString() { @VisibleForTesting public LogAggregationContext getLogAggregationContext() { + this.readLock.lock(); try { - this.readLock.lock(); return this.logAggregationContext; } finally { this.readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java index 85b01cd..f4246c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java @@ -361,17 +361,6 @@ public void initializeCGroupController(CGroupController controller) throws if (enableCGroupMount) { // We have a controller that needs to be mounted mountCGroupController(controller); - } else { - String controllerPath = getControllerPath(controller); - - if (controllerPath == null) { - throw new ResourceHandlerException( - String.format("Controller %s not mounted." - + " You either need to mount it with %s" - + " or mount cgroups before launching Yarn", - controller.getName(), YarnConfiguration. - NM_LINUX_CONTAINER_CGROUPS_MOUNT)); - } } // We are working with a pre-mounted contoller diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index 98f4e21..1e2f1bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -115,8 +115,8 @@ public String toString() { .append(getState() == ResourceState.LOCALIZED ? getLocalPath() + "," + getSize() : "pending").append(",["); + this.readLock.lock(); try { - this.readLock.lock(); for (ContainerId c : ref) { sb.append("(").append(c.toString()).append(")"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 37fe77a..39d1279 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -1118,8 +1118,8 @@ public void testFixedSizeThreadPool() throws Exception { final Lock rLock = rwLock.readLock(); final Lock wLock = rwLock.writeLock(); + wLock.lock(); try { - wLock.lock(); Runnable runnable = new Runnable() { @Override public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index fc8ad2b..e9cb443 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -476,10 +476,9 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, Resource partitionResource, String partitionToLookAt) { TempQueuePerPartition ret; ReadLock readLock = curQueue.getReadLock(); + // Acquire a read lock from Parent/LeafQueue. + readLock.lock(); try { - // Acquire a read lock from Parent/LeafQueue. - readLock.lock(); - String queueName = curQueue.getQueueName(); QueueCapacities qc = curQueue.getQueueCapacities(); float absCap = qc.getAbsoluteCapacity(partitionToLookAt); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 507f696..ca2a06e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -70,10 +70,9 @@ protected void serviceInit(Configuration conf) throws Exception { @Override public void addLabelsToNode(Map> addedLabelsToNode) - throws IOException { + throws IOException { + writeLock.lock(); try { - writeLock.lock(); - // get nodesCollection before edition Map before = cloneNodeMap(addedLabelsToNode.keySet()); @@ -112,8 +111,9 @@ protected void checkRemoveFromClusterNodeLabelsOfQueue( @Override public void removeFromClusterNodeLabels(Collection labelsToRemove) throws IOException { + + writeLock.lock(); try { - writeLock.lock(); if (!isInitNodeLabelStoreInProgress()) { // We cannot remove node labels from collection when some queue(s) are // using any of them. @@ -137,8 +137,9 @@ public void removeFromClusterNodeLabels(Collection labelsToRemove) @Override public void addToCluserNodeLabels(Collection labels) throws IOException { + + writeLock.lock(); try { - writeLock.lock(); super.addToCluserNodeLabels(labels); } finally { writeLock.unlock(); @@ -149,9 +150,9 @@ public void addToCluserNodeLabels(Collection labels) public void removeLabelsFromNode(Map> removeLabelsFromNode) throws IOException { - try { - writeLock.lock(); + writeLock.lock(); + try { // get nodesCollection before edition Map before = cloneNodeMap(removeLabelsFromNode.keySet()); @@ -171,9 +172,9 @@ public void addToCluserNodeLabels(Collection labels) @Override public void replaceLabelsOnNode(Map> replaceLabelsToNode) throws IOException { - try { - writeLock.lock(); + writeLock.lock(); + try { Map> effectiveModifiedLabelMappings = getModifiedNodeLabelsMappings(replaceLabelsToNode); @@ -230,9 +231,9 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * will update running nodes resource */ public void activateNode(NodeId nodeId, Resource resource) { + + writeLock.lock(); try { - writeLock.lock(); - // save if we have a node before Map before = cloneNodeMap(ImmutableSet.of(nodeId)); @@ -273,9 +274,8 @@ public void activateNode(NodeId nodeId, Resource resource) { * Following methods are used for setting if a node unregistered to RM */ public void deactivateNode(NodeId nodeId) { + writeLock.lock(); try { - writeLock.lock(); - // save if we have a node before Map before = cloneNodeMap(ImmutableSet.of(nodeId)); Node nm = getNMInNodeSet(nodeId); @@ -314,8 +314,8 @@ public void updateNodeResource(NodeId node, Resource newResource) { } public void reinitializeQueueLabels(Map> queueToLabels) { + writeLock.lock(); try { - writeLock.lock(); // clear before set this.queueCollections.clear(); @@ -347,8 +347,9 @@ public void reinitializeQueueLabels(Map> queueToLabels) { public Resource getQueueResource(String queueName, Set queueLabels, Resource clusterResource) { + + readLock.lock(); try { - readLock.lock(); if (queueLabels.contains(ANY)) { return clusterResource; } @@ -369,8 +370,8 @@ public int getActiveNMCountPerLabel(String label) { if (label == null) { return 0; } + readLock.lock(); try { - readLock.lock(); RMNodeLabel labelInfo = labelCollections.get(label); return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs(); } finally { @@ -379,8 +380,8 @@ public int getActiveNMCountPerLabel(String label) { } public Set getLabelsOnNode(NodeId nodeId) { + readLock.lock(); try { - readLock.lock(); Set nodeLabels = getLabelsByNode(nodeId); return Collections.unmodifiableSet(nodeLabels); } finally { @@ -389,8 +390,8 @@ public int getActiveNMCountPerLabel(String label) { } public boolean containsNodeLabel(String label) { + readLock.lock(); try { - readLock.lock(); return label != null && (label.isEmpty() || labelCollections.containsKey(label)); } finally { @@ -522,8 +523,9 @@ public Resource getResourceByLabel(String label, Resource clusterResource) { if (label.equals(NO_LABEL)) { return noNodeLabel.getResource(); } + + readLock.lock(); try { - readLock.lock(); RMNodeLabel nodeLabel = labelCollections.get(label); if (nodeLabel == null) { return Resources.none(); @@ -572,8 +574,8 @@ public void setRMContext(RMContext rmContext) { } public List pullRMNodeLabelsInfo() { + readLock.lock(); try { - readLock.lock(); List infos = new ArrayList(); for (Entry entry : labelCollections.entrySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java index 43a4deb..493125b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -45,8 +45,8 @@ public PlacementManager() { } public void updateRules(List rules) { + writeLock.lock(); try { - writeLock.lock(); this.rules = rules; } finally { writeLock.unlock(); @@ -55,8 +55,8 @@ public void updateRules(List rules) { public void placeApplication(ApplicationSubmissionContext asc, String user) throws YarnException { + readLock.lock(); try { - readLock.lock(); if (null == rules || rules.isEmpty()) { return; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index fa2f20c..16e083e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1672,8 +1672,8 @@ public ReservationId getReservationId() { @Override public Map getLogAggregationReportsForApp() { + this.readLock.lock(); try { - this.readLock.lock(); if (!isLogAggregationFinished() && isAppInFinalState(this) && System.currentTimeMillis() > this.logAggregationStartTime + this.logAggregationStatusTimeout) { @@ -1697,8 +1697,8 @@ public ReservationId getReservationId() { } public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { + this.writeLock.lock(); try { - this.writeLock.lock(); if (this.logAggregationEnabled && !isLogAggregationFinished()) { LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); boolean stateChangedToFinal = false; @@ -1747,8 +1747,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { @Override public LogAggregationStatus getLogAggregationStatusForAppReport() { + this.readLock.lock(); try { - this.readLock.lock(); if (! logAggregationEnabled) { return LogAggregationStatus.DISABLED; } @@ -1912,8 +1912,8 @@ private void updateLogAggregationStatus(NodeId nodeId) { } public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { + this.readLock.lock(); try { - this.readLock.lock(); List failureMessages = this.logAggregationFailureMessagesForNMs.get(nodeId); if (failureMessages == null || failureMessages.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 4210c54..0883c66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -2235,8 +2235,8 @@ public RMAppAttemptMetrics getRMAppAttemptMetrics() { @Override public long getFinishTime() { + this.readLock.lock(); try { - this.readLock.lock(); return this.finishTime; } finally { this.readLock.unlock(); @@ -2244,8 +2244,8 @@ public long getFinishTime() { } private void setFinishTime(long finishTime) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.finishTime = finishTime; } finally { this.writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java index e089050..096d7d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java @@ -68,8 +68,8 @@ public RMAppAttemptMetrics(ApplicationAttemptId attemptId, } public void updatePreemptionInfo(Resource resource, RMContainer container) { + writeLock.lock(); try { - writeLock.lock(); resourcePreempted = Resources.addTo(resourcePreempted, resource); } finally { writeLock.unlock(); @@ -92,8 +92,8 @@ public void updatePreemptionInfo(Resource resource, RMContainer container) { } public Resource getResourcePreempted() { + readLock.lock(); try { - readLock.lock(); return resourcePreempted; } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 1e9463a..a29234e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -309,8 +309,8 @@ public SchedulerRequestKey getReservedSchedulerKey() { @Override public Resource getAllocatedResource() { + readLock.lock(); try { - readLock.lock(); return container.getResource(); } finally { readLock.unlock(); @@ -319,8 +319,8 @@ public Resource getAllocatedResource() { @Override public Resource getLastConfirmedResource() { + readLock.lock(); try { - readLock.lock(); return this.lastConfirmedResource; } finally { readLock.unlock(); @@ -349,8 +349,8 @@ public long getCreationTime() { @Override public long getFinishTime() { + readLock.lock(); try { - readLock.lock(); return finishTime; } finally { readLock.unlock(); @@ -359,8 +359,8 @@ public long getFinishTime() { @Override public String getDiagnosticsInfo() { + readLock.lock(); try { - readLock.lock(); if (finishedStatus != null) { return finishedStatus.getDiagnostics(); } else { @@ -389,8 +389,8 @@ public String getLogURL() { @Override public int getContainerExitStatus() { + readLock.lock(); try { - readLock.lock(); if (finishedStatus != null) { return finishedStatus.getExitStatus(); } else { @@ -403,8 +403,8 @@ public int getContainerExitStatus() { @Override public ContainerState getContainerState() { + readLock.lock(); try { - readLock.lock(); if (finishedStatus != null) { return finishedStatus.getState(); } else { @@ -417,8 +417,8 @@ public ContainerState getContainerState() { @Override public List getResourceRequests() { + readLock.lock(); try { - readLock.lock(); return resourceRequests; } finally { readLock.unlock(); @@ -426,8 +426,8 @@ public ContainerState getContainerState() { } public void setResourceRequests(List requests) { + writeLock.lock(); try { - writeLock.lock(); this.resourceRequests = requests; } finally { writeLock.unlock(); @@ -441,8 +441,8 @@ public String toString() { @Override public boolean isAMContainer() { + readLock.lock(); try { - readLock.lock(); return isAMContainer; } finally { readLock.unlock(); @@ -450,8 +450,8 @@ public boolean isAMContainer() { } public void setAMContainer(boolean isAMContainer) { + writeLock.lock(); try { - writeLock.lock(); this.isAMContainer = isAMContainer; } finally { writeLock.unlock(); @@ -473,8 +473,8 @@ public void handle(RMContainerEvent event) { LOG.debug("Processing " + event.getContainerId() + " of type " + event .getType()); } + writeLock.lock(); try { - writeLock.lock(); RMContainerState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); @@ -771,8 +771,8 @@ public ContainerReport createContainerReport() { @Override public String getNodeHttpAddress() { + readLock.lock(); try { - readLock.lock(); if (container.getNodeHttpAddress() != null) { StringBuilder httpAddress = new StringBuilder(); httpAddress.append(WebAppUtils.getHttpSchemePrefix(rmContext @@ -842,8 +842,8 @@ public boolean isRemotelyAllocated() { @Override public Resource getAllocatedOrReservedResource() { + readLock.lock(); try { - readLock.lock(); if (getState().equals(RMContainerState.RESERVED)) { return getReservedResource(); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1f121f8..5f33666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -655,8 +655,8 @@ public void resetLastNodeHeartBeatResponse() { public void handle(RMNodeEvent event) { LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); + writeLock.lock(); try { - writeLock.lock(); NodeState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); @@ -1475,9 +1475,8 @@ private void handleLogAggregationStatus( @Override public List pullNewlyIncreasedContainers() { + writeLock.lock(); try { - writeLock.lock(); - if (nmReportedIncreasedContainers.isEmpty()) { return Collections.EMPTY_LIST; } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d506f4d..1cdf74b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -273,8 +273,8 @@ public long getLastNodeUpdateTime() { protected void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { + readLock.lock(); try { - readLock.lock(); // Get the application for the finished container SchedulerApplicationAttempt application = getCurrentAttemptForContainer(containerId); @@ -414,8 +414,8 @@ private void killOrphanContainerOnNode(RMNode node, public void recoverContainersOnNode(List containerReports, RMNode nm) { + writeLock.lock(); try { - writeLock.lock(); if (!rmContext.isWorkPreservingRecoveryEnabled() || containerReports == null || (containerReports != null && containerReports.isEmpty())) { @@ -689,8 +689,8 @@ public N getSchedulerNode(NodeId nodeId) { @Override public void moveAllApps(String sourceQueue, String destQueue) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); // check if destination queue is a valid leaf queue try { getQueueInfo(destQueue, false, false); @@ -720,8 +720,8 @@ public void moveAllApps(String sourceQueue, String destQueue) @Override public void killAllAppsInQueue(String queueName) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); // check if queue is a valid List apps = getAppsInQueue(queueName); if (apps == null) { @@ -746,8 +746,8 @@ public void killAllAppsInQueue(String queueName) */ public void updateNodeResource(RMNode nm, ResourceOption resourceOption) { + writeLock.lock(); try { - writeLock.lock(); SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); Resource oldResource = node.getTotalResource(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8acf7d5..ba23d92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -124,8 +124,8 @@ public long getNewContainerId() { } public String getQueueName() { + this.readLock.lock(); try { - this.readLock.lock(); return queue.getQueueName(); } finally { this.readLock.unlock(); @@ -173,9 +173,8 @@ public boolean updateResourceRequests(List requests, // Flag to track if any incoming requests update "ANY" requests boolean offswitchResourcesUpdated = false; + this.writeLock.lock(); try { - this.writeLock.lock(); - // A map to group resource requests and dedup Map> dedupRequests = new HashMap<>(); @@ -353,8 +352,8 @@ public boolean getAndResetBlacklistChanged() { */ public List getAllResourceRequests() { List ret = new ArrayList<>(); + this.readLock.lock(); try { - this.readLock.lock(); for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { ret.addAll(ps.getResourceRequests().values()); } @@ -365,8 +364,8 @@ public boolean getAndResetBlacklistChanged() { } public PendingAsk getNextPendingAsk() { + readLock.lock(); try { - readLock.lock(); SchedulerRequestKey firstRequestKey = schedulerKeys.first(); return getPendingAsk(firstRequestKey, ResourceRequest.ANY); } finally { @@ -381,8 +380,8 @@ public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) { public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey, String resourceName) { + this.readLock.lock(); try { - this.readLock.lock(); SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName); } finally { @@ -416,9 +415,8 @@ public boolean isPlaceBlacklisted(String resourceName, public List allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, Container containerAllocated) { + writeLock.lock(); try { - writeLock.lock(); - if (null != containerAllocated) { updateMetricsForAllocatedContainer(type, node, containerAllocated); } @@ -437,8 +435,8 @@ public void checkForDeactivation() { } public void move(Queue newQueue) { + this.writeLock.lock(); try { - this.writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { @@ -473,8 +471,8 @@ public void move(Queue newQueue) { public void stop() { // clear pending resources metrics for the application + this.writeLock.lock(); try { - this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); @@ -499,8 +497,8 @@ public void stop() { } public void setQueue(Queue queue) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.queue = queue; } finally { this.writeLock.unlock(); @@ -525,8 +523,8 @@ public void transferStateFromPreviousAppSchedulingInfo( } public void recoverContainer(RMContainer rmContainer, String partition) { + this.writeLock.lock(); try { - this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); if (pending) { // If there was any container to recover, the application was @@ -553,8 +551,8 @@ public void recoverContainer(RMContainer rmContainer, String partition) { */ public boolean checkAllocation(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey) { + readLock.lock(); try { - readLock.lock(); SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); if (null == ps) { return false; @@ -608,8 +606,8 @@ private void updateMetricsForAllocatedContainer(NodeType type, */ public boolean canDelayTo( SchedulerRequestKey schedulerKey, String resourceName) { + this.readLock.lock(); try { - this.readLock.lock(); SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); return (ps == null) || ps.canDelayTo(resourceName); @@ -620,8 +618,8 @@ public boolean canDelayTo( public boolean acceptNodePartition(SchedulerRequestKey schedulerKey, String nodePartition, SchedulingMode schedulingMode) { + this.readLock.lock(); try { - this.readLock.lock(); SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); return (ps != null) && ps.acceptNodePartition(nodePartition, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 6f0c7d2..f111f37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -146,8 +146,8 @@ public void setUsed(Resource res) { } public void copyAllUsed(ResourceUsage other) { + writeLock.lock(); try { - writeLock.lock(); for (Entry entry : other.usages.entrySet()) { setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed())); } @@ -327,8 +327,8 @@ private Resource _get(String label, ResourceType type) { if (label == null || label.equals(NL)) { return normalize(usageNoLabel.resArr[type.idx]); } + readLock.lock(); try { - readLock.lock(); UsageByLabel usage = usages.get(label); if (null == usage) { return Resources.none(); @@ -340,8 +340,8 @@ private Resource _get(String label, ResourceType type) { } private Resource _getAll(ResourceType type) { + readLock.lock(); try { - readLock.lock(); Resource allOfType = Resources.createResource(0); for (Map.Entry usageEntry : usages.entrySet()) { //all usages types are initialized @@ -375,8 +375,8 @@ private UsageByLabel getAndAddIfMissing(String label) { } private void _set(String label, ResourceType type, Resource res) { + writeLock.lock(); try { - writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); usage.resArr[type.idx] = res; } finally { @@ -385,8 +385,8 @@ private void _set(String label, ResourceType type, Resource res) { } private void _inc(String label, ResourceType type, Resource res) { + writeLock.lock(); try { - writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); Resources.addTo(usage.resArr[type.idx], res); } finally { @@ -395,8 +395,8 @@ private void _inc(String label, ResourceType type, Resource res) { } private void _dec(String label, ResourceType type, Resource res) { + writeLock.lock(); try { - writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); Resources.subtractFrom(usage.resArr[type.idx], res); } finally { @@ -405,8 +405,8 @@ private void _dec(String label, ResourceType type, Resource res) { } public Resource getCachedDemand(String label) { + readLock.lock(); try { - readLock.lock(); Resource demand = Resources.createResource(0); Resources.addTo(demand, getCachedUsed(label)); Resources.addTo(demand, getCachedPending(label)); @@ -418,8 +418,8 @@ public Resource getCachedDemand(String label) { @Override public String toString() { + readLock.lock(); try { - readLock.lock(); return usages.toString(); } finally { readLock.unlock(); @@ -427,8 +427,8 @@ public String toString() { } public Set getNodePartitionsSet() { + readLock.lock(); try { - readLock.lock(); return usages.keySet(); } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 8b2f9db..8a180a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -246,8 +246,8 @@ public void setOpportunisticContainerContext( * @return live containers of the application */ public Collection getLiveContainers() { + readLock.lock(); try { - readLock.lock(); return new ArrayList<>(liveContainers.values()); } finally { readLock.unlock(); @@ -300,8 +300,8 @@ public long getNewContainerId() { public PendingAsk getPendingAsk( SchedulerRequestKey schedulerKey, String resourceName) { + readLock.lock(); try { - readLock.lock(); return appSchedulingInfo.getPendingAsk(schedulerKey, resourceName); } finally { readLock.unlock(); @@ -314,8 +314,8 @@ public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey) { public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey, String resourceName) { + readLock.lock(); try { - readLock.lock(); SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet( schedulerKey); return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName); @@ -362,8 +362,8 @@ public RMContainer getRMContainer(ContainerId id) { public void addRMContainer( ContainerId id, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); liveContainers.put(id, rmContainer); if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.attemptOpportunisticResourceUsage.incUsed( @@ -379,8 +379,8 @@ public void addRMContainer( } public void removeRMContainer(ContainerId containerId) { + writeLock.lock(); try { - writeLock.lock(); RMContainer rmContainer = liveContainers.remove(containerId); if (rmContainer != null) { if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) { @@ -428,8 +428,8 @@ public Queue getQueue() { public boolean updateResourceRequests( List requests) { + writeLock.lock(); try { - writeLock.lock(); if (!isStopped) { return appSchedulingInfo.updateResourceRequests(requests, false); } @@ -441,8 +441,8 @@ public boolean updateResourceRequests( public void recoverResourceRequestsForContainer( List requests) { + writeLock.lock(); try { - writeLock.lock(); if (!isStopped) { appSchedulingInfo.updateResourceRequests(requests, true); } @@ -452,8 +452,8 @@ public void recoverResourceRequestsForContainer( } public void stop(RMAppAttemptState rmAppAttemptFinalState) { + writeLock.lock(); try { - writeLock.lock(); // Cleanup all scheduling information isStopped = true; appSchedulingInfo.stop(); @@ -472,8 +472,8 @@ public boolean isStopped() { */ public List getReservedContainers() { List list = new ArrayList<>(); + readLock.lock(); try { - readLock.lock(); for (Entry> e : this.reservedContainers.entrySet()) { list.addAll(e.getValue().values()); @@ -488,8 +488,8 @@ public boolean isStopped() { public boolean reserveIncreasedContainer(SchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer rmContainer, Resource reservedResource) { + writeLock.lock(); try { - writeLock.lock(); if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) { attemptResourceUsage.incReserved(node.getPartition(), reservedResource); // succeeded @@ -537,8 +537,8 @@ private boolean commonReserve(SchedulerNode node, public RMContainer reserve(SchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer rmContainer, Container container) { + writeLock.lock(); try { - writeLock.lock(); // Create RMContainer if necessary if (rmContainer == null) { rmContainer = new RMContainerImpl(container, schedulerKey, @@ -581,8 +581,8 @@ public Resource getHeadroom() { public int getNumReservedContainers( SchedulerRequestKey schedulerKey) { + readLock.lock(); try { - readLock.lock(); Map map = this.reservedContainers.get( schedulerKey); return (map == null) ? 0 : map.size(); @@ -594,8 +594,8 @@ public int getNumReservedContainers( @SuppressWarnings("unchecked") public void containerLaunchedOnNode(ContainerId containerId, NodeId nodeId) { + writeLock.lock(); try { - writeLock.lock(); // Inform the container RMContainer rmContainer = getRMContainer(containerId); if (rmContainer == null) { @@ -614,8 +614,8 @@ public void containerLaunchedOnNode(ContainerId containerId, public void showRequests() { if (LOG.isDebugEnabled()) { + readLock.lock(); try { - readLock.lock(); for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey); if (ps != null && @@ -701,8 +701,8 @@ private void updateNMToken(Container container) { // some reason like DNS unavailable, do not return this container and keep it // in the newlyAllocatedContainers waiting to be refetched. public List pullNewlyAllocatedContainers() { + writeLock.lock(); try { - writeLock.lock(); List returnContainerList = new ArrayList( newlyAllocatedContainers.size()); @@ -817,8 +817,8 @@ protected synchronized void addToNewlyAllocatedContainers( || ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) { return updatedContainers; } + writeLock.lock(); try { - writeLock.lock(); Iterator> i = newlyUpdatedContainers.entrySet().iterator(); while (i.hasNext()) { @@ -862,8 +862,8 @@ protected synchronized void addToNewlyAllocatedContainers( } public List pullUpdatedNMTokens() { + writeLock.lock(); try { - writeLock.lock(); List returnList = new ArrayList<>(updatedNMTokens); updatedNMTokens.clear(); return returnList; @@ -881,8 +881,8 @@ public boolean isWaitingForAMContainer() { public void updateBlacklist(List blacklistAdditions, List blacklistRemovals) { + writeLock.lock(); try { - writeLock.lock(); if (!isStopped) { if (isWaitingForAMContainer()) { // The request is for the AM-container, and the AM-container is @@ -901,8 +901,8 @@ public void updateBlacklist(List blacklistAdditions, } public boolean isPlaceBlacklisted(String resourceName) { + readLock.lock(); try { - readLock.lock(); boolean forAMContainer = isWaitingForAMContainer(); return this.appSchedulingInfo.isPlaceBlacklisted(resourceName, forAMContainer); @@ -999,8 +999,8 @@ private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { } public ApplicationResourceUsageReport getResourceUsageReport() { + writeLock.lock(); try { - writeLock.lock(); AggregateAppResourceUsage runningResourceUsage = getRunningAggregateAppResourceUsage(); Resource usedResourceClone = Resources.clone( @@ -1045,8 +1045,8 @@ public ApplicationResourceUsageReport getResourceUsageReport() { public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { + writeLock.lock(); try { - writeLock.lock(); this.liveContainers = appAttempt.getLiveContainersMap(); // this.reReservations = appAttempt.reReservations; this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage); @@ -1063,8 +1063,8 @@ public void transferStateFromPreviousAttempt( } public void move(Queue newQueue) { + writeLock.lock(); try { - writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); String newQueueName = newQueue.getQueueName(); @@ -1100,8 +1100,8 @@ public void move(Queue newQueue) { public void recoverContainer(SchedulerNode node, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); // recover app scheduling info appSchedulingInfo.recoverContainer(rmContainer, node.getPartition()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 5fbdead..838f5ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -236,8 +236,8 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { * @param maximumCapacity new max capacity */ void setMaxCapacity(float maximumCapacity) { + writeLock.lock(); try { - writeLock.lock(); // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), queueCapacities.getCapacity(), maximumCapacity); @@ -260,8 +260,8 @@ public String getDefaultNodeLabelExpression() { void setupQueueConfigs(Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // get labels this.accessibleLabels = csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); @@ -488,8 +488,8 @@ public Resource getMinimumAllocation() { void allocateResource(Resource clusterResource, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); queueUsage.incUsed(nodePartition, resource); ++numContainers; @@ -503,8 +503,8 @@ void allocateResource(Resource clusterResource, protected void releaseResource(Resource clusterResource, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); queueUsage.decUsed(nodePartition, resource); CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, @@ -523,8 +523,8 @@ public boolean getReservationContinueLooking() { @Private public Map getACLs() { + readLock.lock(); try { - readLock.lock(); return acls; } finally { readLock.unlock(); @@ -624,8 +624,8 @@ public boolean hasChildQueues() { boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { + readLock.lock(); try { - readLock.lock(); // Get current limited resource: // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect // queues' max capacity. @@ -889,9 +889,8 @@ public boolean accept(Resource cluster, Resource netAllocated = Resources.subtract(required, request.getTotalReleasedResource()); + readLock.lock(); try { - readLock.lock(); - String partition = schedulerContainer.getNodePartition(); Resource maxResourceLimit; if (allocation.getSchedulingMode() @@ -940,8 +939,8 @@ public void updateQueueState(QueueState queueState) { @Override public void activeQueue() throws YarnException { + this.writeLock.lock(); try { - this.writeLock.lock(); if (getState() == QueueState.RUNNING) { LOG.info("The specified queue:" + queueName + " is already in the RUNNING state."); @@ -964,8 +963,8 @@ public void activeQueue() throws YarnException { } protected void appFinished() { + this.writeLock.lock(); try { - this.writeLock.lock(); if (getState() == QueueState.DRAINING) { if (getNumApplications() == 0) { updateQueueState(QueueState.STOPPED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2ccaf63..1eb4de6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -284,8 +284,8 @@ public void setRMContext(RMContext rmContext) { @VisibleForTesting void initScheduler(Configuration configuration) throws IOException { + writeLock.lock(); try { - writeLock.lock(); this.conf = loadCapacitySchedulerConfiguration(configuration); validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); @@ -337,8 +337,8 @@ void initScheduler(Configuration configuration) throws } private void startSchedulerThreads() { + writeLock.lock(); try { - writeLock.lock(); activitiesManager.start(); if (scheduleAsynchronously) { Preconditions.checkNotNull(asyncSchedulerThreads, @@ -369,8 +369,8 @@ public void serviceStart() throws Exception { @Override public void serviceStop() throws Exception { + writeLock.lock(); try { - writeLock.lock(); if (scheduleAsynchronously && asyncSchedulerThreads != null) { for (Thread t : asyncSchedulerThreads) { t.interrupt(); @@ -389,8 +389,8 @@ public void serviceStop() throws Exception { @Override public void reinitialize(Configuration newConf, RMContext rmContext) throws IOException { + writeLock.lock(); try { - writeLock.lock(); Configuration configuration = new Configuration(newConf); CapacitySchedulerConfiguration oldConf = this.conf; this.conf = loadCapacitySchedulerConfiguration(configuration); @@ -503,8 +503,8 @@ public void run() { ResourceCommitRequest request = backlogs.take(); + cs.writeLock.lock(); try { - cs.writeLock.lock(); cs.tryCommit(cs.getClusterResource(), request); } finally { cs.writeLock.unlock(); @@ -529,8 +529,8 @@ public int getPendingBacklogs() { @VisibleForTesting public UserGroupMappingPlacementRule getUserGroupMappingPlacementRule() throws IOException { + readLock.lock(); try { - readLock.lock(); boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); LOG.info( "Initialized queue mappings, override: " + overrideWithQueueMappings); @@ -631,8 +631,8 @@ public CSQueue getQueue(String queueName) { private void addApplicationOnRecovery( ApplicationId applicationId, String queueName, String user, Priority priority) { + writeLock.lock(); try { - writeLock.lock(); CSQueue queue = getQueue(queueName); if (queue == null) { //During a restart, this indicates a queue was removed, which is @@ -699,8 +699,8 @@ private void addApplicationOnRecovery( private void addApplication(ApplicationId applicationId, String queueName, String user, Priority priority) { + writeLock.lock(); try { - writeLock.lock(); if (isSystemAppsLimitReached()) { String message = "Maximum system application limit reached," + "cannot accept submission of application: " + applicationId; @@ -757,8 +757,8 @@ private void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get( applicationAttemptId.getApplicationId()); if (application == null) { @@ -810,8 +810,8 @@ private void addApplicationAttempt( private void doneApplication(ApplicationId applicationId, RMAppState finalState) { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get( applicationId); if (application == null) { @@ -837,8 +837,8 @@ private void doneApplication(ApplicationId applicationId, private void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { + writeLock.lock(); try { - writeLock.lock(); LOG.info("Application Attempt " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); @@ -988,8 +988,8 @@ public QueueInfo getQueueInfo(String queueName, @Override protected void nodeUpdate(RMNode rmNode) { + readLock.lock(); try { - readLock.lock(); setLastNodeUpdateTime(Time.now()); super.nodeUpdate(rmNode); } finally { @@ -998,8 +998,8 @@ protected void nodeUpdate(RMNode rmNode) { // Try to do scheduling if (!scheduleAsynchronously) { + writeLock.lock(); try { - writeLock.lock(); ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, rmNode.getNodeID()); @@ -1022,8 +1022,8 @@ protected void nodeUpdate(RMNode rmNode) { */ private void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { + writeLock.lock(); try { - writeLock.lock(); updateNodeResource(nm, resourceOption); Resource clusterResource = getClusterResource(); getRootQueue().updateClusterResource(clusterResource, @@ -1492,8 +1492,8 @@ public void handle(SchedulerEvent event) { */ private void updateNodeLabelsAndQueueResource( NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { + writeLock.lock(); try { - writeLock.lock(); for (Entry> entry : labelUpdateEvent .getUpdatedNodeToLabels().entrySet()) { NodeId id = entry.getKey(); @@ -1509,8 +1509,8 @@ private void updateNodeLabelsAndQueueResource( } private void addNode(RMNode nodeManager) { + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName, nodeManager.getNodeLabels()); nodeTracker.addNode(schedulerNode); @@ -1540,8 +1540,8 @@ private void addNode(RMNode nodeManager) { } private void removeNode(RMNode nodeInfo) { + writeLock.lock(); try { - writeLock.lock(); // update this node to node label manager if (labelManager != null) { labelManager.deactivateNode(nodeInfo.getNodeID()); @@ -1679,8 +1679,8 @@ public void markContainerForPreemption(ApplicationAttemptId aid, public void markContainerForKillable( RMContainer killableContainer) { + writeLock.lock(); try { - writeLock.lock(); if (LOG.isDebugEnabled()) { LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container" + killableContainer.toString()); @@ -1715,8 +1715,8 @@ public void markContainerForKillable( private void markContainerForNonKillable( RMContainer nonKillableContainer) { + writeLock.lock(); try { - writeLock.lock(); if (LOG.isDebugEnabled()) { LOG.debug( SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container" @@ -1801,8 +1801,8 @@ private String getDefaultReservationQueueName(String planQueueName) { private String resolveReservationQueueName(String queueName, ApplicationId applicationId, ReservationId reservationID, boolean isRecovering) { + readLock.lock(); try { - readLock.lock(); CSQueue queue = getQueue(queueName); // Check if the queue is a plan queue if ((queue == null) || !(queue instanceof PlanQueue)) { @@ -1852,8 +1852,8 @@ private String resolveReservationQueueName(String queueName, @Override public void removeQueue(String queueName) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); LOG.info("Removing queue: " + queueName); CSQueue q = this.getQueue(queueName); if (!(q instanceof ReservationQueue)) { @@ -1882,8 +1882,8 @@ public void removeQueue(String queueName) @Override public void addQueue(Queue queue) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); if (!(queue instanceof ReservationQueue)) { throw new SchedulerDynamicEditException( "Queue " + queue.getQueueName() + " is not a ReservationQueue"); @@ -1911,8 +1911,8 @@ public void addQueue(Queue queue) @Override public void setEntitlement(String inQueue, QueueEntitlement entitlement) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); ParentQueue parent = (ParentQueue) queue.getParent(); @@ -1960,8 +1960,8 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) @Override public String moveApplication(ApplicationId appId, String targetQueueName) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get(appId); if (application == null) { @@ -2012,8 +2012,8 @@ public String moveApplication(ApplicationId appId, @Override public void preValidateMoveApplication(ApplicationId appId, String newQueue) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get(appId); if (application == null) { @@ -2178,8 +2178,8 @@ public Priority updateApplicationPriority(Priority newPriority, ApplicationId applicationId, SettableFuture future, UserGroupInformation user) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); Priority appPriority = null; SchedulerApplication application = applications .get(applicationId); @@ -2458,8 +2458,8 @@ public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { */ public boolean moveReservedContainer(RMContainer toBeMovedContainer, FiCaSchedulerNode targetNode) { + writeLock.lock(); try { - writeLock.lock(); if (LOG.isDebugEnabled()) { LOG.debug("Trying to move container=" + toBeMovedContainer + " to node=" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 2e502b7..f839293 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -152,8 +152,8 @@ public LeafQueue(CapacitySchedulerContext cs, protected void setupQueueConfigs(Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); super.setupQueueConfigs(clusterResource); this.lastClusterResource = clusterResource; @@ -368,8 +368,8 @@ void setUserLimitFactor(float userLimitFactor) { @Override public int getNumApplications() { + readLock.lock(); try { - readLock.lock(); return getNumPendingApplications() + getNumActiveApplications(); } finally { readLock.unlock(); @@ -377,8 +377,8 @@ public int getNumApplications() { } public int getNumPendingApplications() { + readLock.lock(); try { - readLock.lock(); return pendingOrderingPolicy.getNumSchedulableEntities(); } finally { readLock.unlock(); @@ -386,8 +386,8 @@ public int getNumPendingApplications() { } public int getNumActiveApplications() { + readLock.lock(); try { - readLock.lock(); return orderingPolicy.getNumSchedulableEntities(); } finally { readLock.unlock(); @@ -396,8 +396,8 @@ public int getNumActiveApplications() { @Private public int getNumPendingApplications(String user) { + readLock.lock(); try { - readLock.lock(); User u = getUser(user); if (null == u) { return 0; @@ -410,8 +410,8 @@ public int getNumPendingApplications(String user) { @Private public int getNumActiveApplications(String user) { + readLock.lock(); try { - readLock.lock(); User u = getUser(user); if (null == u) { return 0; @@ -442,8 +442,8 @@ public QueueInfo getQueueInfo( @Override public List getQueueUserAclInfo(UserGroupInformation user) { + readLock.lock(); try { - readLock.lock(); QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( QueueUserACLInfo.class); List operations = new ArrayList<>(); @@ -463,8 +463,8 @@ public QueueInfo getQueueInfo( } public String toString() { + readLock.lock(); try { - readLock.lock(); return queueName + ": " + "capacity=" + queueCapacities.getCapacity() + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + "usedResources=" + queueUsage.getUsed() + ", " @@ -484,8 +484,8 @@ public User getUser(String userName) { @Private public List getPriorityACLs() { + readLock.lock(); try { - readLock.lock(); return new ArrayList<>(priorityAcls); } finally { readLock.unlock(); @@ -496,8 +496,8 @@ public User getUser(String userName) { public void reinitialize( CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -535,8 +535,8 @@ public void reinitialize( public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately @@ -575,8 +575,8 @@ public void submitApplication(ApplicationId applicationId, String userName, public void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { + writeLock.lock(); try { - writeLock.lock(); // Check if the queue is accepting jobs if (getState() != QueueState.RUNNING) { String msg = "Queue " + getQueuePath() @@ -644,8 +644,8 @@ public Resource getUserAMResourceLimitPerPartition( if (userName != null && getUser(userName) != null) { userWeight = getUser(userName).getWeight(); } + readLock.lock(); try { - readLock.lock(); /* * The user am resource limit is based on the same approach as the user * limit (as it should represent a subset of that). This means that it uses @@ -680,8 +680,8 @@ public Resource getUserAMResourceLimitPerPartition( public Resource calculateAndGetAMResourceLimitPerPartition( String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); /* * For non-labeled partition, get the max value from resources currently * available to the queue and the absolute resources guaranteed for the @@ -726,8 +726,8 @@ public Resource calculateAndGetAMResourceLimitPerPartition( } private void activateApplications() { + writeLock.lock(); try { - writeLock.lock(); // limit of allowed resource usage for application masters Map userAmPartitionLimit = new HashMap(); @@ -848,8 +848,8 @@ private void activateApplications() { private void addApplicationAttempt(FiCaSchedulerApp application, User user) { + writeLock.lock(); try { - writeLock.lock(); // Accept user.submitApplication(); getPendingAppsOrderingPolicy().addSchedulableEntity(application); @@ -901,8 +901,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) private void removeApplicationAttempt( FiCaSchedulerApp application, String userName) { + writeLock.lock(); try { - writeLock.lock(); // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately @@ -1146,8 +1146,8 @@ public boolean accept(Resource cluster, // Do not check limits when allocation from a reserved container if (allocation.getAllocateFromReservedContainer() == null) { + readLock.lock(); try { - readLock.lock(); FiCaSchedulerApp app = schedulerContainer.getSchedulerApplicationAttempt(); String username = app.getUser(); @@ -1240,8 +1240,8 @@ public void apply(Resource cluster, releaseContainers(cluster, request); + writeLock.lock(); try { - writeLock.lock(); if (request.anythingAllocatedOrReserved()) { ContainerAllocationProposal @@ -1454,8 +1454,8 @@ public Resource getResourceLimitForAllUsers(String userName, protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, String nodePartition, ResourceLimits currentResourceLimits) { + readLock.lock(); try { - readLock.lock(); User user = getUser(userName); currentResourceLimits.setAmountNeededUnreserve(Resources.none()); @@ -1530,8 +1530,8 @@ private void updateSchedulerHealthForCompletedContainer( */ public void recalculateQueueUsageRatio(Resource clusterResource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); ResourceUsage queueResourceUsage = getQueueResourceUsage(); if (nodePartition == null) { @@ -1560,8 +1560,8 @@ public void completedContainer(Resource clusterResource, boolean removed = false; // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); Container container = rmContainer.getContainer(); // Inform the application & the node @@ -1607,8 +1607,8 @@ public void completedContainer(Resource clusterResource, void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, String nodePartition, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); super.allocateResource(clusterResource, resource, nodePartition); // handle ignore exclusivity container @@ -1652,8 +1652,8 @@ void allocateResource(Resource clusterResource, void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); super.releaseResource(clusterResource, resource, nodePartition); // handle ignore exclusivity container @@ -1712,8 +1712,8 @@ private void updateCurrentResourceLimits( @Override public void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { + writeLock.lock(); try { - writeLock.lock(); updateCurrentResourceLimits(currentResourceLimits, clusterResource); lastClusterResource = clusterResource; @@ -1789,8 +1789,8 @@ public void recoverContainer(Resource clusterResource, } // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerNode node = scheduler.getNode( rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, @@ -1853,8 +1853,8 @@ public void recoverContainer(Resource clusterResource, public Resource getTotalPendingResourcesConsideringUserLimit( Resource clusterResources, String partition, boolean deductReservedFromPending) { + readLock.lock(); try { - readLock.lock(); Map userNameToHeadroom = new HashMap<>(); Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0); @@ -1897,8 +1897,8 @@ public Resource getTotalPendingResourcesConsideringUserLimit( @Override public void collectSchedulerApplications( Collection apps) { + readLock.lock(); try { - readLock.lock(); for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); @@ -1957,8 +1957,8 @@ public void detachContainer(Resource clusterResource, public Map> getIgnoreExclusivityRMContainers() { Map> clonedMap = new HashMap<>(); + readLock.lock(); try { - readLock.lock(); for (Map.Entry> entry : ignorePartitionExclusivityRMContainers .entrySet()) { @@ -1991,8 +1991,8 @@ public void setMaxApplications(int maxApplications) { void setOrderingPolicy( OrderingPolicy orderingPolicy) { + writeLock.lock(); try { - writeLock.lock(); if (null != this.orderingPolicy) { orderingPolicy.addAllSchedulableEntities( this.orderingPolicy.getSchedulableEntities()); @@ -2010,8 +2010,8 @@ public Priority getDefaultApplicationPriority() { public void updateApplicationPriority(SchedulerApplication app, Priority newAppPriority) { + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerApp attempt = app.getCurrentAppAttempt(); boolean isActive = orderingPolicy.removeSchedulableEntity(attempt); if (!isActive) { @@ -2062,8 +2062,8 @@ public Resource getClusterResource() { @Override public void stopQueue() { + writeLock.lock(); try { - writeLock.lock(); if (getNumApplications() > 0) { updateQueueState(QueueState.DRAINING); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index f6ada4f..f79f04a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -119,8 +119,8 @@ private String getQueueOrderingPolicyConfigName() { protected void setupQueueConfigs(Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); super.setupQueueConfigs(clusterResource); StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { @@ -159,8 +159,8 @@ protected void setupQueueConfigs(Resource clusterResource) private static float PRECISION = 0.0005f; // 0.05% precision void setChildQueues(Collection childQueues) { + writeLock.lock(); try { - writeLock.lock(); // Validate float childCapacities = 0; for (CSQueue queue : childQueues) { @@ -209,8 +209,8 @@ public String getQueuePath() { @Override public QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { + readLock.lock(); try { - readLock.lock(); QueueInfo queueInfo = getQueueInfo(); List childQueuesInfo = new ArrayList<>(); @@ -231,8 +231,8 @@ public QueueInfo getQueueInfo( private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { + readLock.lock(); try { - readLock.lock(); QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( QueueUserACLInfo.class); List operations = new ArrayList(); @@ -254,8 +254,8 @@ private QueueUserACLInfo getUserAclInfo( @Override public List getQueueUserAclInfo( UserGroupInformation user) { + readLock.lock(); try { - readLock.lock(); List userAcls = new ArrayList<>(); // Add parent queue acls @@ -287,8 +287,8 @@ public String toString() { @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -380,8 +380,8 @@ public void reinitialize(CSQueue newlyParsedQueue, public void submitApplication(ApplicationId applicationId, String user, String queue) throws AccessControlException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check validateSubmitApplication(applicationId, user, queue); @@ -405,8 +405,8 @@ public void submitApplication(ApplicationId applicationId, String user, public void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { + writeLock.lock(); try { - writeLock.lock(); if (queue.equals(queueName)) { throw new AccessControlException( "Cannot submit application " + "to non-leaf queue: " + queueName); @@ -437,8 +437,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, private void addApplication(ApplicationId applicationId, String user) { + writeLock.lock(); try { - writeLock.lock(); ++numApplications; LOG.info( @@ -465,8 +465,8 @@ public void finishApplication(ApplicationId application, String user) { private void removeApplication(ApplicationId applicationId, String user) { + writeLock.lock(); try { - writeLock.lock(); --numApplications; LOG.info("Application removed -" + " appId: " + applicationId + " user: " @@ -791,8 +791,8 @@ private void printChildQueues() { private void internalReleaseResource(Resource clusterResource, FiCaSchedulerNode node, Resource releasedResource) { + writeLock.lock(); try { - writeLock.lock(); super.releaseResource(clusterResource, releasedResource, node.getPartition()); @@ -828,8 +828,8 @@ public void completedContainer(Resource clusterResource, @Override public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { + writeLock.lock(); try { - writeLock.lock(); // Update all children for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers @@ -853,8 +853,8 @@ public boolean hasChildQueues() { @Override public List getChildQueues() { + readLock.lock(); try { - readLock.lock(); return new ArrayList(childQueues); } finally { readLock.unlock(); @@ -870,8 +870,8 @@ public void recoverContainer(Resource clusterResource, } // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerNode node = scheduler.getNode( rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, @@ -894,8 +894,8 @@ public ActiveUsersManager getAbstractUsersManager() { @Override public void collectSchedulerApplications( Collection apps) { + readLock.lock(); try { - readLock.lock(); for (CSQueue queue : childQueues) { queue.collectSchedulerApplications(apps); } @@ -950,8 +950,8 @@ public int getNumApplications() { void allocateResource(Resource clusterResource, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); super.allocateResource(clusterResource, resource, nodePartition); /** @@ -1037,8 +1037,8 @@ public void apply(Resource cluster, // Do not modify queue when allocation from reserved container if (allocation.getAllocateFromReservedContainer() == null) { + writeLock.lock(); try { - writeLock.lock(); // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(cluster, allocation.getAllocatedOrReservedResource(), @@ -1061,8 +1061,8 @@ public void apply(Resource cluster, @Override public void stopQueue() { + this.writeLock.lock(); try { - this.writeLock.lock(); if (getNumApplications() > 0) { updateQueueState(QueueState.DRAINING); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 882262f..f92c0bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -81,8 +81,8 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName, @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -121,8 +121,8 @@ public void reinitialize(CSQueue newlyParsedQueue, void addChildQueue(CSQueue newQueue) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); if (newQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException( "Queue " + newQueue + " being added has non zero capacity."); @@ -139,8 +139,8 @@ void addChildQueue(CSQueue newQueue) void removeChildQueue(CSQueue remQueue) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); if (remQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException( "Queue " + remQueue + " being removed has non zero capacity."); @@ -161,8 +161,8 @@ void removeChildQueue(CSQueue remQueue) } protected float sumOfChildCapacities() { + writeLock.lock(); try { - writeLock.lock(); float ret = 0; for (CSQueue l : childQueues) { ret += l.getCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index cc4af3d..e29e6b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -83,8 +83,8 @@ public String toString() { } private float _get(String label, CapacityType type) { + readLock.lock(); try { - readLock.lock(); Capacities cap = capacitiesMap.get(label); if (null == cap) { return LABEL_DOESNT_EXIST_CAP; @@ -96,8 +96,8 @@ private float _get(String label, CapacityType type) { } private void _set(String label, CapacityType type, float value) { + writeLock.lock(); try { - writeLock.lock(); Capacities cap = capacitiesMap.get(label); if (null == cap) { cap = new Capacities(); @@ -277,8 +277,8 @@ public void setAbsoluteReservedCapacity(String label, float value) { * configurable fields, and load new values */ public void clearConfigurableFields() { + writeLock.lock(); try { - writeLock.lock(); for (String label : capacitiesMap.keySet()) { _set(label, CapacityType.CAP, 0); _set(label, CapacityType.MAX_CAP, 0); @@ -291,8 +291,8 @@ public void clearConfigurableFields() { } public Set getExistingNodeLabels() { + readLock.lock(); try { - readLock.lock(); return new HashSet(capacitiesMap.keySet()); } finally { readLock.unlock(); @@ -301,8 +301,8 @@ public void clearConfigurableFields() { @Override public String toString() { + readLock.lock(); try { - readLock.lock(); return this.capacitiesMap.toString(); } finally { readLock.unlock(); @@ -310,8 +310,8 @@ public String toString() { } public Set getNodePartitionsSet() { + readLock.lock(); try { - readLock.lock(); return capacitiesMap.keySet(); } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 3d1b731..fded9ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -53,8 +53,8 @@ public ReservationQueue(CapacitySchedulerContext cs, String queueName, @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -85,8 +85,8 @@ public void reinitialize(CSQueue newlyParsedQueue, */ public void setEntitlement(QueueEntitlement entitlement) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); float capacity = entitlement.getCapacity(); if (capacity < 0 || capacity > 1.0f) { throw new SchedulerDynamicEditException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 5f7d185..bb06fa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -112,8 +112,8 @@ public UsageRatios() { } private void incUsageRatio(String label, float delta) { + writeLock.lock(); try { - writeLock.lock(); float usage = 0f; if (usageRatios.containsKey(label)) { usage = usageRatios.get(label); @@ -126,8 +126,8 @@ private void incUsageRatio(String label, float delta) { } private float getUsageRatio(String label) { + readLock.lock(); try { - readLock.lock(); Float f = usageRatios.get(label); if (null == f) { return 0.0f; @@ -139,8 +139,8 @@ private float getUsageRatio(String label) { } private void setUsageRatio(String label, float ratio) { + writeLock.lock(); try { - writeLock.lock(); usageRatios.put(label, ratio); } finally { writeLock.unlock(); @@ -177,8 +177,8 @@ public ResourceUsage getResourceUsage() { public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); userUsageRatios.setUsageRatio(nodePartition, 0); return updateUsageRatio(resourceCalculator, resource, nodePartition); } finally { @@ -188,8 +188,8 @@ public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator, public float updateUsageRatio(ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); float delta; float newRatio = Resources.ratio(resourceCalculator, getUsed(nodePartition), resource); @@ -356,9 +356,8 @@ public void userLimitNeedsRecompute() { // If latestVersionOfUsersState is negative due to overflow, ideally we need // to reset it. This method is invoked from UsersManager and LeafQueue and // all is happening within write/readLock. Below logic can help to set 0. + writeLock.lock(); try { - writeLock.lock(); - long value = latestVersionOfUsersState.incrementAndGet(); if (value < 0) { latestVersionOfUsersState.set(0); @@ -393,8 +392,8 @@ public User getUser(String userName) { * User Name */ public void removeUser(String userName) { + writeLock.lock(); try { - writeLock.lock(); this.users.remove(userName); // Remove user from active/non-active list as well. @@ -415,8 +414,8 @@ public void removeUser(String userName) { * @return User object */ public User getUserAndAddIfAbsent(String userName) { + writeLock.lock(); try { - writeLock.lock(); User u = getUser(userName); if (null == u) { u = new User(userName); @@ -493,8 +492,8 @@ public Resource getComputedResourceLimitForActiveUsers(String userName, Map userLimitPerSchedulingMode = preComputedActiveUserLimit .get(nodePartition); + writeLock.lock(); try { - writeLock.lock(); if (isRecomputeNeeded(schedulingMode, nodePartition, true)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, @@ -551,8 +550,8 @@ public Resource getComputedResourceLimitForAllUsers(String userName, Map userLimitPerSchedulingMode = preComputedAllUserLimit .get(nodePartition); + writeLock.lock(); try { - writeLock.lock(); if (isRecomputeNeeded(schedulingMode, nodePartition, false)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, @@ -599,8 +598,8 @@ private boolean isRecomputeNeeded(SchedulingMode schedulingMode, */ private void setLocalVersionOfUsersState(String nodePartition, SchedulingMode schedulingMode, boolean isActive) { + writeLock.lock(); try { - writeLock.lock(); Map> localVersionOfUsersState = (isActive) ? localVersionOfActiveUsersState : localVersionOfAllUsersState; @@ -623,8 +622,8 @@ private void setLocalVersionOfUsersState(String nodePartition, */ private long getLocalVersionOfUsersState(String nodePartition, SchedulingMode schedulingMode, boolean isActive) { + this.readLock.lock(); try { - this.readLock.lock(); Map> localVersionOfUsersState = (isActive) ? localVersionOfActiveUsersState : localVersionOfAllUsersState; @@ -809,8 +808,8 @@ private Resource computeUserLimit(String userName, Resource clusterResource, * Cluster Resource */ public void updateUsageRatio(String partition, Resource clusterResource) { + writeLock.lock(); try { - writeLock.lock(); Resource resourceByLabel = labelManager.getResourceByLabel(partition, clusterResource); float consumed = 0; @@ -836,9 +835,8 @@ private void incQueueUsageRatio(String nodePartition, float delta) { @Override public void activateApplication(String user, ApplicationId applicationId) { + this.writeLock.lock(); try { - this.writeLock.lock(); - Set userApps = usersApplications.get(user); if (userApps == null) { userApps = new HashSet(); @@ -864,9 +862,8 @@ public void activateApplication(String user, ApplicationId applicationId) { @Override public void deactivateApplication(String user, ApplicationId applicationId) { + this.writeLock.lock(); try { - this.writeLock.lock(); - Set userApps = usersApplications.get(user); if (userApps != null) { if (userApps.remove(applicationId)) { @@ -898,8 +895,8 @@ public int getNumActiveUsers() { float sumActiveUsersTimesWeights() { float count = 0.0f; + this.readLock.lock(); try { - this.readLock.lock(); for (String u : activeUsersSet) { count += getUser(u).getWeight(); } @@ -911,8 +908,8 @@ float sumActiveUsersTimesWeights() { float sumAllUsersTimesWeights() { float count = 0.0f; + this.readLock.lock(); try { - this.readLock.lock(); for (String u : users.keySet()) { count += getUser(u).getWeight(); } @@ -923,9 +920,8 @@ float sumAllUsersTimesWeights() { } private void updateActiveUsersResourceUsage(String userName) { + this.writeLock.lock(); try { - this.writeLock.lock(); - // For UT case: We might need to add the user to users list. User user = getUserAndAddIfAbsent(userName); ResourceUsage resourceUsage = user.getResourceUsage(); @@ -962,9 +958,8 @@ private void updateActiveUsersResourceUsage(String userName) { } private void updateNonActiveUsersResourceUsage(String userName) { + this.writeLock.lock(); try { - this.writeLock.lock(); - // For UT case: We might need to add the user to users list. User user = getUser(userName); if (user == null) return; @@ -1031,8 +1026,8 @@ private ResourceUsage getTotalResourceUsagePerUser(String userName) { */ public User updateUserResourceUsage(String userName, Resource resource, String nodePartition, boolean isAllocate) { + this.writeLock.lock(); try { - this.writeLock.lock(); // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately @@ -1078,8 +1073,8 @@ private void updateResourceUsagePerUser(User user, Resource resource, } public void updateUserWeights() { + this.writeLock.lock(); try { - this.writeLock.lock(); for (Map.Entry ue : users.entrySet()) { ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java index 76fcd4a..ae07087 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -43,8 +43,8 @@ public PreemptionManager() { } public void refreshQueues(CSQueue parent, CSQueue current) { + writeLock.lock(); try { - writeLock.lock(); PreemptableQueue parentEntity = null; if (parent != null) { parentEntity = entities.get(parent.getQueueName()); @@ -67,8 +67,8 @@ public void refreshQueues(CSQueue parent, CSQueue current) { } public void addKillableContainer(KillableContainer container) { + writeLock.lock(); try { - writeLock.lock(); PreemptableQueue entity = entities.get(container.getLeafQueueName()); if (null != entity) { entity.addKillableContainer(container); @@ -80,8 +80,8 @@ public void addKillableContainer(KillableContainer container) { } public void removeKillableContainer(KillableContainer container) { + writeLock.lock(); try { - writeLock.lock(); PreemptableQueue entity = entities.get(container.getLeafQueueName()); if (null != entity) { entity.removeKillableContainer(container); @@ -106,8 +106,8 @@ public void updateKillableContainerResource(KillableContainer container, @VisibleForTesting public Map getKillableContainersMap( String queueName, String partition) { + readLock.lock(); try { - readLock.lock(); PreemptableQueue entity = entities.get(queueName); if (entity != null) { Map containers = @@ -129,8 +129,8 @@ public void updateKillableContainerResource(KillableContainer container, } public Resource getKillableResource(String queueName, String partition) { + readLock.lock(); try { - readLock.lock(); PreemptableQueue entity = entities.get(queueName); if (entity != null) { Resource res = entity.getTotalKillableResources().get(partition); @@ -147,8 +147,8 @@ public Resource getKillableResource(String queueName, String partition) { } public Map getShallowCopyOfPreemptableQueues() { + readLock.lock(); try { - readLock.lock(); Map map = new HashMap<>(); for (Map.Entry entry : entities.entrySet()) { String key = entry.getKey(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 17bb104..d35dff3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -173,8 +173,8 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, String partition) { + writeLock.lock(); try { - writeLock.lock(); ContainerId containerId = rmContainer.getContainerId(); // Remove from the list of containers @@ -211,8 +211,8 @@ public boolean containerCompleted(RMContainer rmContainer, public RMContainer allocate(FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Container container) { + readLock.lock(); try { - readLock.lock(); if (isStopped) { return null; @@ -374,8 +374,8 @@ public boolean accept(Resource cluster, List resourceRequests = null; boolean reReservation = false; + readLock.lock(); try { - readLock.lock(); // First make sure no container in release list in final state if (anyContainerInFinalState(request)) { @@ -483,8 +483,8 @@ public void apply(Resource cluster, ResourceCommitRequest request) { boolean reReservation = false; + writeLock.lock(); try { - writeLock.lock(); // If we allocated something if (request.anythingAllocatedOrReserved()) { @@ -577,8 +577,8 @@ public void apply(Resource cluster, public boolean unreserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); // Done with the reservation? if (internalUnreserve(node, schedulerKey)) { node.unreserveResource(this); @@ -633,8 +633,8 @@ private boolean internalUnreserve(FiCaSchedulerNode node, } public Map getTotalPendingRequestsPerPartition() { + readLock.lock(); try { - readLock.lock(); Map ret = new HashMap<>(); for (SchedulerRequestKey schedulerKey : appSchedulingInfo @@ -665,8 +665,8 @@ private boolean internalUnreserve(FiCaSchedulerNode node, } public void markContainerForPreemption(ContainerId cont) { + writeLock.lock(); try { - writeLock.lock(); // ignore already completed containers if (liveContainers.containsKey(cont)) { containersToPreempt.add(cont); @@ -688,8 +688,8 @@ public void markContainerForPreemption(ContainerId cont) { */ public Allocation getAllocation(ResourceCalculator resourceCalculator, Resource clusterResource, Resource minimumAllocation) { + writeLock.lock(); try { - writeLock.lock(); Set currentContPreemption = Collections.unmodifiableSet( new HashSet(containersToPreempt)); containersToPreempt.clear(); @@ -755,8 +755,8 @@ public NodeId getNodeIdToUnreserve( public void setHeadroomProvider( CapacityHeadroomProvider headroomProvider) { + writeLock.lock(); try { - writeLock.lock(); this.headroomProvider = headroomProvider; } finally { writeLock.unlock(); @@ -765,8 +765,8 @@ public void setHeadroomProvider( @Override public Resource getHeadroom() { + readLock.lock(); try { - readLock.lock(); if (headroomProvider != null) { return headroomProvider.getHeadroom(); } @@ -780,8 +780,8 @@ public Resource getHeadroom() { @Override public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { + writeLock.lock(); try { - writeLock.lock(); super.transferStateFromPreviousAttempt(appAttempt); this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider; } finally { @@ -809,8 +809,8 @@ public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, public RMContainer findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Resource minimumUnreservedResource) { + readLock.lock(); try { - readLock.lock(); // need to unreserve some other container first NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey, minimumUnreservedResource, rc, clusterResource); @@ -978,11 +978,11 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { */ @Override public ApplicationResourceUsageReport getResourceUsageReport() { + writeLock.lock(); try { // Use write lock here because // SchedulerApplicationAttempt#getResourceUsageReport updated fields // TODO: improve this - writeLock.lock(); ApplicationResourceUsageReport report = super.getResourceUsageReport(); Resource cluster = rmContext.getScheduler().getClusterResource(); Resource totalPartitionRes = @@ -1045,8 +1045,8 @@ public boolean equals(Object o) { */ public boolean moveReservation(RMContainer reservedContainer, FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) { + writeLock.lock(); try { - writeLock.lock(); if (!sourceNode.getPartition().equals(targetNode.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 5dfef73..9047e41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -135,8 +135,8 @@ public QueueMetrics getMetrics() { void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + writeLock.lock(); try { - writeLock.lock(); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -183,8 +183,8 @@ void containerCompleted(RMContainer rmContainer, private void unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { + writeLock.lock(); try { - writeLock.lock(); Map reservedContainers = this.reservedContainers.get( schedulerKey); RMContainer reservedContainer = reservedContainers.remove( @@ -286,8 +286,8 @@ NodeType getAllowedLocalityLevel( return NodeType.OFF_SWITCH; } + writeLock.lock(); try { - writeLock.lock(); // Default level is NODE_LOCAL if (!allowedLocalityLevel.containsKey(schedulerKey)) { @@ -356,8 +356,8 @@ NodeType getAllowedLocalityLevelByTime( return NodeType.OFF_SWITCH; } + writeLock.lock(); try { - writeLock.lock(); // default level is NODE_LOCAL if (!allowedLocalityLevel.containsKey(schedulerKey)) { @@ -427,8 +427,8 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, RMContainer rmContainer; Container container; + writeLock.lock(); try { - writeLock.lock(); // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(schedulerKey); if (allowed != null) { @@ -500,8 +500,8 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, void resetAllowedLocalityLevel( SchedulerRequestKey schedulerKey, NodeType level) { NodeType old; + writeLock.lock(); try { - writeLock.lock(); old = allowedLocalityLevel.put(schedulerKey, level); } finally { writeLock.unlock(); @@ -731,8 +731,8 @@ private void setReservation(SchedulerNode node) { String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + writeLock.lock(); try { - writeLock.lock(); Set rackReservations = reservations.get(rackName); if (rackReservations == null) { rackReservations = new HashSet<>(); @@ -748,8 +748,8 @@ private void clearReservation(SchedulerNode node) { String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + writeLock.lock(); try { - writeLock.lock(); Set rackReservations = reservations.get(rackName); if (rackReservations != null) { rackReservations.remove(node.getNodeName()); @@ -911,8 +911,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // For each priority, see if we can schedule a node local, rack local // or off-switch request. Rack of off-switch requests may be delayed // (not scheduled) in order to promote better locality. + writeLock.lock(); try { - writeLock.lock(); // TODO (wandga): All logics in this method should be added to // SchedulerPlacement#canDelayTo which is independent from scheduler. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index db02bab..9bfed26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -408,8 +408,8 @@ public void update() { } public ResourceWeights getAppWeight(FSAppAttempt app) { + readLock.lock(); try { - readLock.lock(); double weight = 1.0; if (sizeBasedWeight) { // Set weight based on current memory demand @@ -491,8 +491,8 @@ protected void addApplication(ApplicationId applicationId, return; } + writeLock.lock(); try { - writeLock.lock(); RMApp rmApp = rmContext.getRMApps().get(applicationId); FSLeafQueue queue = assignToQueue(rmApp, queueName, user); if (queue == null) { @@ -543,8 +543,8 @@ protected void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get( applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -649,8 +649,8 @@ private void removeApplication(ApplicationId applicationId, private void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { + writeLock.lock(); try { - writeLock.lock(); LOG.info("Application " + applicationAttemptId + " is done. finalState=" + rmAppAttemptFinalState); FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId); @@ -716,8 +716,8 @@ private void removeApplicationAttempt( protected void completedContainerInternal( RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + writeLock.lock(); try { - writeLock.lock(); Container container = rmContainer.getContainer(); // Get the application for the finished container @@ -755,8 +755,8 @@ protected void completedContainerInternal( private void addNode(List containerReports, RMNode node) { + writeLock.lock(); try { - writeLock.lock(); FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); nodeTracker.addNode(schedulerNode); @@ -777,8 +777,8 @@ private void addNode(List containerReports, } private void removeNode(RMNode rmNode) { + writeLock.lock(); try { - writeLock.lock(); NodeId nodeId = rmNode.getNodeID(); FSSchedulerNode node = nodeTracker.getNode(nodeId); if (node == null) { @@ -904,8 +904,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, @Override protected void nodeUpdate(RMNode nm) { + writeLock.lock(); try { - writeLock.lock(); long start = getClock().getTime(); eventLog.log("HEARTBEAT", nm.getHostName()); super.nodeUpdate(nm); @@ -1005,8 +1005,8 @@ static void assignPreemptedContainers(FSSchedulerNode node) { @VisibleForTesting void attemptScheduling(FSSchedulerNode node) { + writeLock.lock(); try { - writeLock.lock(); if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext .isSchedulerReadyForAllocatingContainers()) { return; @@ -1205,8 +1205,8 @@ public void handle(SchedulerEvent event) { private String resolveReservationQueueName(String queueName, ApplicationId applicationId, ReservationId reservationID, boolean isRecovering) { + readLock.lock(); try { - readLock.lock(); FSQueue queue = queueMgr.getQueue(queueName); if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) { return queueName; @@ -1269,8 +1269,8 @@ public void setRMContext(RMContext rmContext) { } private void initScheduler(Configuration conf) throws IOException { + writeLock.lock(); try { - writeLock.lock(); this.conf = new FairSchedulerConfiguration(conf); validateConf(this.conf); authorizer = YarnAuthorizationProvider.getInstance(conf); @@ -1365,8 +1365,8 @@ private void updateReservationThreshold() { } private void startSchedulerThreads() { + writeLock.lock(); try { - writeLock.lock(); Preconditions.checkNotNull(updateThread, "updateThread is null"); Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); updateThread.start(); @@ -1398,8 +1398,8 @@ public void serviceStart() throws Exception { @Override public void serviceStop() throws Exception { + writeLock.lock(); try { - writeLock.lock(); if (updateThread != null) { updateThread.interrupt(); updateThread.join(THREAD_JOIN_TIMEOUT_MS); @@ -1464,8 +1464,8 @@ public int getNumClusterNodes() { @Override public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { + readLock.lock(); try { - readLock.lock(); FSQueue queue = getQueueManager().getQueue(queueName); if (queue == null) { if (LOG.isDebugEnabled()) { @@ -1569,8 +1569,8 @@ private void applyChildDefaults() { @Override public String moveApplication(ApplicationId appId, String queueName) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); @@ -1615,8 +1615,8 @@ public String moveApplication(ApplicationId appId, @Override public void preValidateMoveApplication(ApplicationId appId, String newQueue) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); @@ -1747,8 +1747,8 @@ FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) { @Override public void updateNodeResource(RMNode nm, ResourceOption resourceOption) { + writeLock.lock(); try { - writeLock.lock(); super.updateNodeResource(nm, resourceOption); updateRootQueueMetrics(); queueMgr.getRootQueue().setSteadyFairShare(getClusterResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index d932e0e..001c12b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -52,9 +52,8 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Container container) { + writeLock.lock(); try { - writeLock.lock(); - if (isStopped) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java index 6cc8cc7..4c8c605 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java @@ -119,8 +119,8 @@ private void updateNodeLabels(ResourceRequest request) { public ResourceRequestUpdateResult updateResourceRequests( Collection requests, boolean recoverPreemptedRequestForAContainer) { + this.writeLock.lock(); try { - this.writeLock.lock(); ResourceRequestUpdateResult updateResult = null; @@ -170,8 +170,8 @@ private ResourceRequest getResourceRequest(String resourceName) { @Override public PendingAsk getPendingAsk(String resourceName) { + readLock.lock(); try { - readLock.lock(); ResourceRequest request = getResourceRequest(resourceName); if (null == request) { return PendingAsk.ZERO; @@ -187,8 +187,8 @@ public PendingAsk getPendingAsk(String resourceName) { @Override public int getOutstandingAsksCount(String resourceName) { + readLock.lock(); try { - readLock.lock(); ResourceRequest request = getResourceRequest(resourceName); if (null == request) { return 0; @@ -301,8 +301,8 @@ private void decResourceRequest(String resourceName, @Override public boolean canAllocate(NodeType type, SchedulerNode node) { + readLock.lock(); try { - readLock.lock(); ResourceRequest r = resourceRequestMap.get( ResourceRequest.ANY); if (r == null || r.getNumContainers() <= 0) { @@ -329,8 +329,8 @@ public boolean canAllocate(NodeType type, SchedulerNode node) { @Override public boolean canDelayTo(String resourceName) { + readLock.lock(); try { - readLock.lock(); ResourceRequest request = getResourceRequest(resourceName); return request == null || request.getRelaxLocality(); } finally { @@ -406,8 +406,8 @@ public void showRequests() { @Override public Iterator getAcceptedResouceNames() { + readLock.lock(); try { - readLock.lock(); return resourceRequestMap.keySet().iterator(); } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index 956391e..b1cec85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -191,8 +191,8 @@ public void run() { public NMToken createAndGetNMToken(String applicationSubmitter, ApplicationAttemptId appAttemptId, Container container) { + this.writeLock.lock(); try { - this.writeLock.lock(); HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); NMToken nmToken = null; if (nodeSet != null) { @@ -213,8 +213,8 @@ public NMToken createAndGetNMToken(String applicationSubmitter, } public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet()); } finally { this.writeLock.unlock(); @@ -225,8 +225,8 @@ public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { @VisibleForTesting public boolean isApplicationAttemptRegistered( ApplicationAttemptId appAttemptId) { + this.readLock.lock(); try { - this.readLock.lock(); return this.appAttemptToNodeKeyMap.containsKey(appAttemptId); } finally { this.readLock.unlock(); @@ -237,8 +237,8 @@ public boolean isApplicationAttemptRegistered( @VisibleForTesting public boolean isApplicationAttemptNMTokenPresent( ApplicationAttemptId appAttemptId, NodeId nodeId) { + this.readLock.lock(); try { - this.readLock.lock(); HashSet nodes = this.appAttemptToNodeKeyMap.get(appAttemptId); if (nodes != null && nodes.contains(nodeId)) { return true; @@ -251,8 +251,8 @@ public boolean isApplicationAttemptNMTokenPresent( } public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.appAttemptToNodeKeyMap.remove(appAttemptId); } finally { this.writeLock.unlock(); @@ -265,8 +265,8 @@ public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) { * @param nodeId */ public void removeNodeKey(NodeId nodeId) { + this.writeLock.lock(); try { - this.writeLock.lock(); Iterator> appNodeKeySetIterator = this.appAttemptToNodeKeyMap.values().iterator(); while (appNodeKeySetIterator.hasNext()) {