From 6eb54fa975853f2cf52a80cb5bc5f6829c0db92d Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Mon, 4 Mar 2019 13:03:48 +0530 Subject: [PATCH] YARN-9341 --- .../apache/hadoop/yarn/service/ServiceManager.java | 2 +- .../hadoop/yarn/service/component/Component.java | 2 +- .../component/instance/ComponentInstance.java | 10 +-- .../client/api/impl/FileSystemTimelineWriter.java | 46 +++++++------- .../yarn/nodelabels/CommonNodeLabelsManager.java | 20 +++--- .../nodelabels/NonAppendableFSNodeLabelStore.java | 6 +- .../yarn/security/ConfiguredYarnAuthorizer.java | 4 +- .../yarn/server/timeline/LeveldbTimelineStore.java | 6 +- .../yarn/server/nodemanager/ContainerExecutor.java | 9 ++- .../application/ApplicationImpl.java | 2 +- .../containermanager/container/ContainerImpl.java | 5 +- .../linux/resources/CGroupsHandlerImpl.java | 9 ++- .../localizer/LocalizedResource.java | 5 +- .../logaggregation/TestLogAggregationService.java | 2 +- .../monitor/capacity/FifoCandidatesSelector.java | 2 +- .../capacity/IntraQueueCandidatesSelector.java | 2 +- .../ProportionalCapacityPreemptionPolicy.java | 5 +- .../nodelabels/NodeAttributesManagerImpl.java | 17 +++-- .../nodelabels/RMNodeLabelsManager.java | 35 +++++------ .../placement/PlacementManager.java | 6 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 8 +-- .../rmapp/attempt/RMAppAttemptImpl.java | 6 +- .../rmapp/attempt/RMAppAttemptMetrics.java | 4 +- .../rmcontainer/RMContainerImpl.java | 27 ++++---- .../server/resourcemanager/rmnode/RMNodeImpl.java | 5 +- .../scheduler/AbstractResourceUsage.java | 14 ++--- .../scheduler/AbstractYarnScheduler.java | 10 +-- .../scheduler/AppSchedulingInfo.java | 27 ++++---- .../resourcemanager/scheduler/ResourceUsage.java | 4 +- .../scheduler/SchedulerApplicationAttempt.java | 53 ++++++++-------- .../capacity/AbstractAutoCreatedLeafQueue.java | 2 +- .../scheduler/capacity/AbstractCSQueue.java | 23 ++++--- .../capacity/AbstractManagedParentQueue.java | 13 ++-- .../scheduler/capacity/AutoCreatedLeafQueue.java | 5 +- .../scheduler/capacity/CapacityScheduler.java | 66 ++++++++++---------- .../scheduler/capacity/LeafQueue.java | 72 +++++++++++----------- .../scheduler/capacity/ManagedParentQueue.java | 16 ++--- .../scheduler/capacity/ParentQueue.java | 39 ++++++------ .../scheduler/capacity/PlanQueue.java | 2 +- .../scheduler/capacity/QueueCapacities.java | 12 ++-- .../scheduler/capacity/ReservationQueue.java | 2 +- .../scheduler/capacity/UsersManager.java | 49 ++++++++------- .../capacity/preemption/PreemptionManager.java | 12 ++-- .../GuaranteedOrZeroCapacityOverTimePolicy.java | 23 ++++--- .../scheduler/common/fica/FiCaSchedulerApp.java | 28 ++++----- .../MemoryPlacementConstraintManager.java | 22 +++---- .../scheduler/fair/FSAppAttempt.java | 21 +++---- .../scheduler/fifo/FifoAppAttempt.java | 4 +- .../placement/LocalityAppPlacementAllocator.java | 14 ++--- .../SingleConstraintAppPlacementAllocator.java | 4 +- .../security/NMTokenSecretManagerInRM.java | 12 ++-- .../volume/csi/lifecycle/VolumeImpl.java | 6 +- 52 files changed, 388 insertions(+), 412 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index aefdadd..3c8fed6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -117,8 +117,8 @@ public ServiceManager(ServiceContext context) { @Override public void handle(ServiceEvent event) { + writeLock.lock(); try { - writeLock.lock(); State oldState = getState(); try { stateMachine.doTransition(event.getType(), event); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 8958dc7..cbc489c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -1090,8 +1090,8 @@ public ServiceScheduler getScheduler() { @Override public void handle(ComponentEvent event) { + writeLock.lock(); try { - writeLock.lock(); ComponentState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index f44cd6e..700408e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -751,8 +751,8 @@ public void setContainerState(ContainerState state) { @Override public void handle(ComponentInstanceEvent event) { + writeLock.lock(); try { - writeLock.lock(); ComponentInstanceState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); @@ -782,8 +782,8 @@ public String getCompInstanceName() { void updateLocalizationStatuses( List statuses) { Map resourcesCpy = new HashMap<>(); + readLock.lock(); try { - readLock.lock(); if (resolvedParams == null || resolvedParams.didLaunchFail() || resolvedParams.getResolvedRsrcPaths() == null || resolvedParams.getResolvedRsrcPaths().isEmpty()) { @@ -823,8 +823,8 @@ void updateLocalizationStatuses( public void updateResolvedLaunchParams( Future future) { + writeLock.lock(); try { - writeLock.lock(); this.resolvedParams = future.get(); } catch (InterruptedException | ExecutionException e) { LOG.error("{} updating resolved params", getCompInstanceId(), e); @@ -834,8 +834,8 @@ public void updateResolvedLaunchParams( } public ContainerStatus getContainerStatus() { + readLock.lock(); try { - readLock.lock(); return status; } finally { readLock.unlock(); @@ -844,8 +844,8 @@ public ContainerStatus getContainerStatus() { private void setContainerStatus(ContainerId containerId, ContainerStatus latestStatus) { + writeLock.lock(); try { - writeLock.lock(); this.status = latestStatus; org.apache.hadoop.yarn.service.api.records.Container containerRec = getCompSpec().getContainer(containerId.toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index c00a0b8..d4043ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -478,8 +478,8 @@ public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, @Override public void flush() throws IOException { + this.domainFDLocker.lock(); try { - this.domainFDLocker.lock(); if (domainLogFD != null) { domainLogFD.flush(); } @@ -494,8 +494,8 @@ public void flush() throws IOException { private Map copySummaryLogFDs( Map summanyLogFDsToCopy) { + summaryTableCopyLocker.lock(); try { - summaryTableCopyLocker.lock(); return new HashMap( summanyLogFDsToCopy); } finally { @@ -506,8 +506,8 @@ public void flush() throws IOException { private Map> copyEntityLogFDs(Map> entityLogFDsToCopy) { + entityTableCopyLocker.lock(); try { - entityTableCopyLocker.lock(); return new HashMap>(entityLogFDsToCopy); } finally { @@ -521,8 +521,8 @@ private void flushSummaryFDMap(Map logFDEntry : logFDs .entrySet()) { EntityLogFD logFD = logFDEntry.getValue(); + logFD.lock(); try { - logFD.lock(); logFD.flush(); } finally { logFD.unlock(); @@ -541,8 +541,8 @@ private void flushEntityFDMap(Map logFDEntry : logFDMap.entrySet()) { EntityLogFD logFD = logFDEntry.getValue(); + logFD.lock(); try { - logFD.lock(); logFD.flush(); } finally { logFD.unlock(); @@ -567,8 +567,8 @@ public void run() { private void cleanInActiveFDs() { long currentTimeStamp = Time.monotonicNow(); + this.domainFDLocker.lock(); try { - this.domainFDLocker.lock(); if (domainLogFD != null) { if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) { domainLogFD.close(); @@ -593,8 +593,8 @@ private void cleanInActiveSummaryFDsforMap( for (Entry logFDEntry : logFDs .entrySet()) { EntityLogFD logFD = logFDEntry.getValue(); + logFD.lock(); try { - logFD.lock(); if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { logFD.close(); } @@ -617,8 +617,8 @@ private void cleanInActiveEntityFDsforMap(Map logFDEntry : logFDMap.entrySet()) { EntityLogFD logFD = logFDEntry.getValue(); + logFD.lock(); try { - logFD.lock(); if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { logFD.close(); } @@ -644,8 +644,8 @@ public void run() { private class TimerMonitorTask extends TimerTask { @Override public void run() { + timerTasksMonitorWriteLock.lock(); try { - timerTasksMonitorWriteLock.lock(); monitorTimerTasks(); } finally { timerTasksMonitorWriteLock.unlock(); @@ -691,8 +691,8 @@ private void cancelAndCloseTimerTasks() { monitorTaskTimer = null; } + this.domainFDLocker.lock(); try { - this.domainFDLocker.lock(); if (domainLogFD != null) { domainLogFD.close(); domainLogFD = null; @@ -708,8 +708,8 @@ private void cancelAndCloseTimerTasks() { private void closeEntityFDs(Map> logFDs) { + entityTableLocker.lock(); try { - entityTableLocker.lock(); if (!logFDs.isEmpty()) { for (Entry> logFDMapEntry : logFDs.entrySet()) { @@ -734,8 +734,8 @@ private void closeEntityFDs(Map logFDs) { + summaryTableLocker.lock(); try { - summaryTableLocker.lock(); if (!logFDs.isEmpty()) { for (Entry logFDEntry : logFDs.entrySet()) { @@ -757,8 +757,8 @@ public void writeDomainLog(FileSystem fs, Path logPath, ObjectMapper objMapper, TimelineDomain domain, boolean isAppendSupported) throws IOException { checkAndStartTimeTasks(); + this.domainFDLocker.lock(); try { - this.domainFDLocker.lock(); if (this.domainLogFD != null) { this.domainLogFD.writeDomain(domain); } else { @@ -790,8 +790,8 @@ private void writeEntityLogs(FileSystem fs, Path logPath, if (logMapFD != null) { EntityLogFD logFD = logMapFD.get(groupId); if (logFD != null) { + logFD.lock(); try { - logFD.lock(); if (serviceStopped) { return; } @@ -814,8 +814,8 @@ private void createEntityFDandWrite(FileSystem fs, Path logPath, TimelineEntityGroupId groupId, List entities, boolean isAppendSupported, Map> logFDs) throws IOException{ + entityTableLocker.lock(); try { - entityTableLocker.lock(); if (serviceStopped) { return; } @@ -828,11 +828,11 @@ private void createEntityFDandWrite(FileSystem fs, Path logPath, if (logFD == null) { logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported); } + logFD.lock(); try { - logFD.lock(); logFD.writeEntities(entities); + entityTableCopyLocker.lock(); try { - entityTableCopyLocker.lock(); logFDMap.put(groupId, logFD); logFDs.put(attemptId, logFDMap); } finally { @@ -862,8 +862,8 @@ private void writeSummmaryEntityLogs(FileSystem fs, Path logPath, EntityLogFD logFD = null; logFD = logFDs.get(attemptId); if (logFD != null) { + logFD.lock(); try { - logFD.lock(); if (serviceStopped) { return; } @@ -881,8 +881,8 @@ private void createSummaryFDAndWrite(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, List entities, boolean isAppendSupported, Map logFDs) throws IOException { + summaryTableLocker.lock(); try { - summaryTableLocker.lock(); if (serviceStopped) { return; } @@ -890,11 +890,11 @@ private void createSummaryFDAndWrite(FileSystem fs, Path logPath, if (logFD == null) { logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported); } + logFD.lock(); try { - logFD.lock(); logFD.writeEntities(entities); + summaryTableCopyLocker.lock(); try { - summaryTableCopyLocker.lock(); logFDs.put(attemptId, logFD); } finally { summaryTableCopyLocker.unlock(); @@ -928,12 +928,12 @@ private void createAndStartTimerTasks() { } private void checkAndStartTimeTasks() { + this.timerTasksMonitorReadLock.lock(); try { - this.timerTasksMonitorReadLock.lock(); this.timeStampOfLastWrite = Time.monotonicNow(); if(!timerTaskStarted) { + timerTaskLocker.lock(); try { - timerTaskLocker.lock(); if (!timerTaskStarted) { createAndStartTimerTasks(); timerTaskStarted = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index 19254c1..3345275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -765,8 +765,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) @SuppressWarnings("unchecked") private Map> generateNodeLabelsInfoPerNode(Class type) { + readLock.lock(); try { - readLock.lock(); Map> nodeToLabels = new HashMap<>(); for (Entry entry : nodeCollections.entrySet()) { String hostName = entry.getKey(); @@ -808,8 +808,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return set of nodes with no labels */ public Set getNodesWithoutALabel() { + readLock.lock(); try { - readLock.lock(); Set nodes = new HashSet<>(); for (Host host : nodeCollections.values()) { for (NodeId nodeId : host.nms.keySet()) { @@ -831,8 +831,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return labels to nodes map */ public Map> getLabelsToNodes() { + readLock.lock(); try { - readLock.lock(); return getLabelsToNodes(labelCollections.keySet()); } finally { readLock.unlock(); @@ -847,8 +847,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return labels to nodes map */ public Map> getLabelsToNodes(Set labels) { + readLock.lock(); try { - readLock.lock(); Map> labelsToNodes = getLabelsToNodesMapping(labels, String.class); return Collections.unmodifiableMap(labelsToNodes); @@ -864,8 +864,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return labels to nodes map */ public Map> getLabelsInfoToNodes() { + readLock.lock(); try { - readLock.lock(); return getLabelsInfoToNodes(labelCollections.keySet()); } finally { readLock.unlock(); @@ -881,8 +881,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return labels to nodes map */ public Map> getLabelsInfoToNodes(Set labels) { + readLock.lock(); try { - readLock.lock(); Map> labelsToNodes = getLabelsToNodesMapping( labels, NodeLabel.class); return Collections.unmodifiableMap(labelsToNodes); @@ -921,8 +921,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * @return existing valid labels in repository */ public Set getClusterNodeLabelNames() { + readLock.lock(); try { - readLock.lock(); Set labels = new HashSet(labelCollections.keySet()); labels.remove(NO_LABEL); return Collections.unmodifiableSet(labels); @@ -932,8 +932,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) } public List getClusterNodeLabels() { + readLock.lock(); try { - readLock.lock(); List nodeLabels = new ArrayList<>(); for (RMNodeLabel label : labelCollections.values()) { if (!label.getLabelName().equals(NO_LABEL)) { @@ -951,8 +951,8 @@ public boolean isExclusiveNodeLabel(String nodeLabel) throws IOException { if (nodeLabel.equals(NO_LABEL)) { return noNodeLabel.getIsExclusive(); } + readLock.lock(); try { - readLock.lock(); RMNodeLabel label = labelCollections.get(nodeLabel); if (label == null) { String message = @@ -1047,8 +1047,8 @@ protected Node getNMInNodeSet(NodeId nodeId, Map map, } public Set getLabelsInfoByNode(NodeId nodeId) { + readLock.lock(); try { - readLock.lock(); Set labels = getLabelsByNode(nodeId, nodeCollections); if (labels.isEmpty()) { return EMPTY_NODELABEL_SET; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java index 6747037..eae789f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java @@ -93,10 +93,10 @@ public void removeClusterNodeLabels(Collection labels) private void writeNewMirror() throws IOException { ReentrantReadWriteLock.ReadLock readLock = manager.readLock; + // Acquire readlock to make sure we get cluster node labels and + // node-to-labels mapping atomically. + readLock.lock(); try { - // Acquire readlock to make sure we get cluster node labels and - // node-to-labels mapping atomically. - readLock.lock(); // Write mirror to mirror.new.tmp file Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp"); try (FSDataOutputStream os = fs.create(newTmpPath, true)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java index 36c5214..615ecb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java @@ -57,8 +57,8 @@ public void init(Configuration conf) { @Override public void setPermission(List permissions, UserGroupInformation user) { + writeLock.lock(); try { - writeLock.lock(); for (Permission perm : permissions) { allAcls.put(perm.getTarget(), perm.getAcls()); } @@ -94,8 +94,8 @@ private boolean checkPermissionInternal(AccessType accessType, @Override public boolean checkPermission(AccessRequest accessRequest) { + readLock.lock(); try { - readLock.lock(); return checkPermissionInternal(accessRequest.getAccessType(), accessRequest.getEntity(), accessRequest.getUser()); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java index e3db1dc..c9ce936 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java @@ -986,8 +986,8 @@ private static void writePrimaryFilterEntries(WriteBatch writeBatch, @Override public TimelinePutResponse put(TimelineEntities entities) { + deleteLock.readLock().lock(); try { - deleteLock.readLock().lock(); TimelinePutResponse response = new TimelinePutResponse(); for (TimelineEntity entity : entities.getEntities()) { put(entity, response, false); @@ -1001,8 +1001,8 @@ public TimelinePutResponse put(TimelineEntities entities) { @Private @VisibleForTesting public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) { + deleteLock.readLock().lock(); try { - deleteLock.readLock().lock(); TimelinePutResponse response = new TimelinePutResponse(); for (TimelineEntity entity : entities.getEntities()) { put(entity, response, true); @@ -1525,8 +1525,8 @@ void discardOldEntities(long timestamp) LeveldbIterator iterator = null; LeveldbIterator pfIterator = null; long typeCount = 0; + deleteLock.writeLock().lock(); try { - deleteLock.writeLock().lock(); iterator = getDbIterator(false); pfIterator = getDbIterator(false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index a87c494..61e4364 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -569,8 +569,8 @@ protected void logOutput(String output) { * @return the path of the pid-file for the given containerId. */ protected Path getPidFilePath(ContainerId containerId) { + readLock.lock(); try { - readLock.lock(); return (this.pidFiles.get(containerId)); } finally { readLock.unlock(); @@ -720,9 +720,8 @@ protected Path getPidFilePath(ContainerId containerId) { * @return true if the container is active */ protected boolean isContainerActive(ContainerId containerId) { + readLock.lock(); try { - readLock.lock(); - return (this.pidFiles.containsKey(containerId)); } finally { readLock.unlock(); @@ -742,8 +741,8 @@ protected String getNMEnvVar(String varname) { * of the launched process */ public void activateContainer(ContainerId containerId, Path pidFilePath) { + writeLock.lock(); try { - writeLock.lock(); this.pidFiles.put(containerId, pidFilePath); } finally { writeLock.unlock(); @@ -778,8 +777,8 @@ public void activateContainer(ContainerId containerId, Path pidFilePath) { * @param containerId the container ID */ public void deactivateContainer(ContainerId containerId) { + writeLock.lock(); try { - writeLock.lock(); this.pidFiles.remove(containerId); } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index ad995fb..5f02e33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -667,8 +667,8 @@ public String toString() { @VisibleForTesting public LogAggregationContext getLogAggregationContext() { + this.readLock.lock(); try { - this.readLock.lock(); return this.logAggregationContext; } finally { this.readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 8aa8d07..d25206c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -953,8 +953,8 @@ private void clearIpAndHost() { @Override public void setIpAndHost(String[] ipAndHost) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.ips = ipAndHost[0]; this.host = ipAndHost[1]; } finally { @@ -2107,9 +2107,8 @@ public void transition(ContainerImpl container, ContainerEvent event) { @Override public void handle(ContainerEvent event) { + this.writeLock.lock(); try { - this.writeLock.lock(); - ContainerId containerID = event.getContainerID(); if (LOG.isDebugEnabled()) { LOG.debug("Processing " + containerID + " of type " + event.getType()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java index d2ec207..4fa6c02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java @@ -130,8 +130,8 @@ private void init() throws ResourceHandlerException { @Override public String getControllerPath(CGroupController controller) { + rwLock.readLock().lock(); try { - rwLock.readLock().lock(); return controllerPaths.get(controller); } finally { rwLock.readLock().unlock(); @@ -169,8 +169,8 @@ private void initializeControllerPaths() throws ResourceHandlerException { } // we want to do a bulk update without the paths changing concurrently + rwLock.writeLock().lock(); try { - rwLock.writeLock().lock(); controllerPaths = cPaths; parsedMtab = newMtab; } finally { @@ -293,10 +293,9 @@ private void mountCGroupController(CGroupController controller) if (existingMountPath == null || !requestedMountPath.equals(existingMountPath)) { + //lock out other readers/writers till we are done + rwLock.writeLock().lock(); try { - //lock out other readers/writers till we are done - rwLock.writeLock().lock(); - // If the controller was already mounted we have to mount it // with the same options to clone the mount point otherwise // the operation will fail diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index 7cca7cf..25990d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -116,8 +116,8 @@ public String toString() { .append(getState() == ResourceState.LOCALIZED ? getLocalPath() + "," + getSize() : "pending").append(",["); + this.readLock.lock(); try { - this.readLock.lock(); for (ContainerId c : ref) { sb.append("(").append(c.toString()).append(")"); } @@ -187,9 +187,8 @@ public void unlock() { @Override public void handle(ResourceEvent event) { + this.writeLock.lock(); try { - this.writeLock.lock(); - Path resourcePath = event.getLocalResourceRequest().getPath(); if (LOG.isDebugEnabled()) { LOG.debug("Processing " + resourcePath + " of type " + event.getType()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 1130c0c..000b73b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -1194,8 +1194,8 @@ public void testFixedSizeThreadPool() throws Exception { final Lock rLock = rwLock.readLock(); final Lock wLock = rwLock.writeLock(); + wLock.lock(); try { - wLock.lock(); Runnable runnable = new Runnable() { @Override public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index c2735f1..630aa6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -96,8 +96,8 @@ .getResToObtainByPartitionForLeafQueue(preemptionContext, queueName, clusterResource); + leafQueue.getReadLock().lock(); try { - leafQueue.getReadLock().lock(); // go through all ignore-partition-exclusivity containers first to make // sure such containers will be preemptionCandidates first Map> ignorePartitionExclusivityContainers = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index c52fd95..6671eb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -178,8 +178,8 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { // 7. Based on the selected resource demand per partition, select // containers with known policy from inter-queue preemption. + leafQueue.getReadLock().lock(); try { - leafQueue.getReadLock().lock(); for (FiCaSchedulerApp app : apps) { preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates, curCandidates, clusterResource, totalPreemptedResourceAllowed, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 2156b09..6429f69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -566,10 +566,9 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, Resource partitionResource, String partitionToLookAt) { TempQueuePerPartition ret; ReadLock readLock = curQueue.getReadLock(); + // Acquire a read lock from Parent/LeafQueue. + readLock.lock(); try { - // Acquire a read lock from Parent/LeafQueue. - readLock.lock(); - String queueName = curQueue.getQueueName(); QueueCapacities qc = curQueue.getQueueCapacities(); float absCap = qc.getAbsoluteCapacity(partitionToLookAt); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index 99272d4..3ec033f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -158,9 +158,8 @@ protected void internalUpdateAttributesOnNodes( AttributeMappingOperationType op, Map newAttributesToBeAdded, String attributePrefix) { + writeLock.lock(); try { - writeLock.lock(); - // shows node->attributes Mapped as part of this operation. StringBuilder logMsg = new StringBuilder(op.name()); logMsg.append(" attributes on nodes:"); @@ -406,8 +405,8 @@ protected String normalizeAttributeValue(String value) { public Map> getAttributesToNodes( Set attributes) { + readLock.lock(); try { - readLock.lock(); boolean fetchAllAttributes = (attributes == null || attributes.isEmpty()); Map> attributesToNodes = new HashMap<>(); @@ -426,8 +425,8 @@ protected String normalizeAttributeValue(String value) { } public Resource getResourceByAttribute(NodeAttribute attribute) { + readLock.lock(); try { - readLock.lock(); return clusterAttributes.containsKey(attribute.getAttributeKey()) ? clusterAttributes.get(attribute.getAttributeKey()).getResource() : Resource.newInstance(0, 0); @@ -439,8 +438,8 @@ public Resource getResourceByAttribute(NodeAttribute attribute) { @Override public Map getAttributesForNode( String hostName) { + readLock.lock(); try { - readLock.lock(); return nodeCollections.containsKey(hostName) ? nodeCollections.get(hostName).getAttributes() : new HashMap<>(); @@ -451,8 +450,8 @@ public Resource getResourceByAttribute(NodeAttribute attribute) { @Override public List getNodeToAttributes(Set prefix) { + readLock.lock(); try { - readLock.lock(); List nodeToAttributes = new ArrayList<>(); nodeCollections.forEach((k, v) -> { List attrs; @@ -479,8 +478,8 @@ public Resource getResourceByAttribute(NodeAttribute attribute) { @Override public Map> getNodesToAttributes( Set hostNames) { + readLock.lock(); try { - readLock.lock(); boolean fetchAllNodes = (hostNames == null || hostNames.isEmpty()); Map> nodeToAttrs = new HashMap<>(); if (fetchAllNodes) { @@ -501,8 +500,8 @@ public Resource getResourceByAttribute(NodeAttribute attribute) { } public void activateNode(NodeId nodeId, Resource resource) { + writeLock.lock(); try { - writeLock.lock(); String hostName = nodeId.getHost(); Host host = nodeCollections.get(hostName); if (host == null) { @@ -519,8 +518,8 @@ public void activateNode(NodeId nodeId, Resource resource) { } public void deactivateNode(NodeId nodeId) { + writeLock.lock(); try { - writeLock.lock(); Host host = nodeCollections.get(nodeId.getHost()); for (NodeAttribute attribute : host.getAttributes().keySet()) { clusterAttributes.get(attribute.getAttributeKey()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 507f696..370a2f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -70,10 +70,9 @@ protected void serviceInit(Configuration conf) throws Exception { @Override public void addLabelsToNode(Map> addedLabelsToNode) - throws IOException { + throws IOException { + writeLock.lock(); try { - writeLock.lock(); - // get nodesCollection before edition Map before = cloneNodeMap(addedLabelsToNode.keySet()); @@ -112,8 +111,8 @@ protected void checkRemoveFromClusterNodeLabelsOfQueue( @Override public void removeFromClusterNodeLabels(Collection labelsToRemove) throws IOException { + writeLock.lock(); try { - writeLock.lock(); if (!isInitNodeLabelStoreInProgress()) { // We cannot remove node labels from collection when some queue(s) are // using any of them. @@ -137,8 +136,8 @@ public void removeFromClusterNodeLabels(Collection labelsToRemove) @Override public void addToCluserNodeLabels(Collection labels) throws IOException { + writeLock.lock(); try { - writeLock.lock(); super.addToCluserNodeLabels(labels); } finally { writeLock.unlock(); @@ -149,9 +148,8 @@ public void addToCluserNodeLabels(Collection labels) public void removeLabelsFromNode(Map> removeLabelsFromNode) throws IOException { + writeLock.lock(); try { - writeLock.lock(); - // get nodesCollection before edition Map before = cloneNodeMap(removeLabelsFromNode.keySet()); @@ -171,9 +169,8 @@ public void addToCluserNodeLabels(Collection labels) @Override public void replaceLabelsOnNode(Map> replaceLabelsToNode) throws IOException { + writeLock.lock(); try { - writeLock.lock(); - Map> effectiveModifiedLabelMappings = getModifiedNodeLabelsMappings(replaceLabelsToNode); @@ -230,9 +227,8 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) * will update running nodes resource */ public void activateNode(NodeId nodeId, Resource resource) { + writeLock.lock(); try { - writeLock.lock(); - // save if we have a node before Map before = cloneNodeMap(ImmutableSet.of(nodeId)); @@ -273,9 +269,8 @@ public void activateNode(NodeId nodeId, Resource resource) { * Following methods are used for setting if a node unregistered to RM */ public void deactivateNode(NodeId nodeId) { + writeLock.lock(); try { - writeLock.lock(); - // save if we have a node before Map before = cloneNodeMap(ImmutableSet.of(nodeId)); Node nm = getNMInNodeSet(nodeId); @@ -314,8 +309,8 @@ public void updateNodeResource(NodeId node, Resource newResource) { } public void reinitializeQueueLabels(Map> queueToLabels) { + writeLock.lock(); try { - writeLock.lock(); // clear before set this.queueCollections.clear(); @@ -347,8 +342,8 @@ public void reinitializeQueueLabels(Map> queueToLabels) { public Resource getQueueResource(String queueName, Set queueLabels, Resource clusterResource) { + readLock.lock(); try { - readLock.lock(); if (queueLabels.contains(ANY)) { return clusterResource; } @@ -369,8 +364,8 @@ public int getActiveNMCountPerLabel(String label) { if (label == null) { return 0; } + readLock.lock(); try { - readLock.lock(); RMNodeLabel labelInfo = labelCollections.get(label); return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs(); } finally { @@ -379,8 +374,8 @@ public int getActiveNMCountPerLabel(String label) { } public Set getLabelsOnNode(NodeId nodeId) { + readLock.lock(); try { - readLock.lock(); Set nodeLabels = getLabelsByNode(nodeId); return Collections.unmodifiableSet(nodeLabels); } finally { @@ -389,8 +384,8 @@ public int getActiveNMCountPerLabel(String label) { } public boolean containsNodeLabel(String label) { + readLock.lock(); try { - readLock.lock(); return label != null && (label.isEmpty() || labelCollections.containsKey(label)); } finally { @@ -522,8 +517,8 @@ public Resource getResourceByLabel(String label, Resource clusterResource) { if (label.equals(NO_LABEL)) { return noNodeLabel.getResource(); } + readLock.lock(); try { - readLock.lock(); RMNodeLabel nodeLabel = labelCollections.get(label); if (nodeLabel == null) { return Resources.none(); @@ -572,8 +567,8 @@ public void setRMContext(RMContext rmContext) { } public List pullRMNodeLabelsInfo() { + readLock.lock(); try { - readLock.lock(); List infos = new ArrayList(); for (Entry entry : labelCollections.entrySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java index 74cf7ba..c334b0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -44,8 +44,8 @@ public PlacementManager() { } public void updateRules(List rules) { + writeLock.lock(); try { - writeLock.lock(); this.rules = rules; } finally { writeLock.unlock(); @@ -54,10 +54,8 @@ public void updateRules(List rules) { public ApplicationPlacementContext placeApplication( ApplicationSubmissionContext asc, String user) throws YarnException { - + readLock.lock(); try { - readLock.lock(); - if (null == rules || rules.isEmpty()) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 696b39d..b0e1e26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1775,8 +1775,8 @@ public ReservationId getReservationId() { @Override public Map getLogAggregationReportsForApp() { + this.readLock.lock(); try { - this.readLock.lock(); if (!isLogAggregationFinished() && isAppInFinalState(this) && systemClock.getTime() > this.logAggregationStartTime + this.logAggregationStatusTimeout) { @@ -1800,8 +1800,8 @@ public ReservationId getReservationId() { } public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { + this.writeLock.lock(); try { - this.writeLock.lock(); if (this.logAggregationEnabled && !isLogAggregationFinished()) { LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); boolean stateChangedToFinal = false; @@ -1850,8 +1850,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { @Override public LogAggregationStatus getLogAggregationStatusForAppReport() { + this.readLock.lock(); try { - this.readLock.lock(); if (! logAggregationEnabled) { return LogAggregationStatus.DISABLED; } @@ -2021,8 +2021,8 @@ private void updateLogAggregationStatus(NodeId nodeId) { } public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { + this.readLock.lock(); try { - this.readLock.lock(); List failureMessages = this.logAggregationFailureMessagesForNMs.get(nodeId); if (failureMessages == null || failureMessages.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 7b04ae7..473aaeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1589,8 +1589,8 @@ public boolean shouldCountTowardsMaxAttemptRetry() { && this.getFinishTime() < (end - attemptFailuresValidityInterval)) { return false; } + this.readLock.lock(); try { - this.readLock.lock(); int exitStatus = getAMContainerExitStatus(); return !(exitStatus == ContainerExitStatus.PREEMPTED || exitStatus == ContainerExitStatus.ABORTED @@ -2275,8 +2275,8 @@ public RMAppAttemptMetrics getRMAppAttemptMetrics() { @Override public long getFinishTime() { + this.readLock.lock(); try { - this.readLock.lock(); return this.finishTime; } finally { this.readLock.unlock(); @@ -2284,8 +2284,8 @@ public long getFinishTime() { } private void setFinishTime(long finishTime) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.finishTime = finishTime; } finally { this.writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java index 43e6e4d..49b69c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java @@ -72,8 +72,8 @@ public RMAppAttemptMetrics(ApplicationAttemptId attemptId, } public void updatePreemptionInfo(Resource resource, RMContainer container) { + writeLock.lock(); try { - writeLock.lock(); resourcePreempted = Resources.addTo(resourcePreempted, resource); } finally { writeLock.unlock(); @@ -96,8 +96,8 @@ public void updatePreemptionInfo(Resource resource, RMContainer container) { } public Resource getResourcePreempted() { + readLock.lock(); try { - readLock.lock(); return Resource.newInstance(resourcePreempted); } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index ad30447..0a2fed0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -306,8 +306,8 @@ public SchedulerRequestKey getReservedSchedulerKey() { @Override public Resource getAllocatedResource() { + readLock.lock(); try { - readLock.lock(); return container.getResource(); } finally { readLock.unlock(); @@ -316,8 +316,8 @@ public Resource getAllocatedResource() { @Override public Resource getLastConfirmedResource() { + readLock.lock(); try { - readLock.lock(); return this.lastConfirmedResource; } finally { readLock.unlock(); @@ -346,8 +346,8 @@ public long getCreationTime() { @Override public long getFinishTime() { + readLock.lock(); try { - readLock.lock(); return finishTime; } finally { readLock.unlock(); @@ -356,8 +356,8 @@ public long getFinishTime() { @Override public String getDiagnosticsInfo() { + readLock.lock(); try { - readLock.lock(); if (finishedStatus != null) { return finishedStatus.getDiagnostics(); } else { @@ -370,8 +370,8 @@ public String getDiagnosticsInfo() { @Override public String getLogURL() { + readLock.lock(); try { - readLock.lock(); StringBuilder logURL = new StringBuilder(); logURL.append(WebAppUtils.getHttpSchemePrefix(rmContext .getYarnConfiguration())); @@ -386,8 +386,8 @@ public String getLogURL() { @Override public int getContainerExitStatus() { + readLock.lock(); try { - readLock.lock(); if (finishedStatus != null) { return finishedStatus.getExitStatus(); } else { @@ -400,8 +400,8 @@ public int getContainerExitStatus() { @Override public ContainerState getContainerState() { + readLock.lock(); try { - readLock.lock(); if (finishedStatus != null) { return finishedStatus.getState(); } else { @@ -414,8 +414,8 @@ public ContainerState getContainerState() { @Override public ContainerRequest getContainerRequest() { + readLock.lock(); try { - readLock.lock(); return containerRequestForRecovery; } finally { readLock.unlock(); @@ -438,8 +438,8 @@ public String toString() { @Override public boolean isAMContainer() { + readLock.lock(); try { - readLock.lock(); return isAMContainer; } finally { readLock.unlock(); @@ -447,8 +447,8 @@ public boolean isAMContainer() { } public void setAMContainer(boolean isAMContainer) { + writeLock.lock(); try { - writeLock.lock(); this.isAMContainer = isAMContainer; } finally { writeLock.unlock(); @@ -470,8 +470,9 @@ public void handle(RMContainerEvent event) { LOG.debug("Processing " + event.getContainerId() + " of type " + event .getType()); } + + writeLock.lock(); try { - writeLock.lock(); RMContainerState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); @@ -809,8 +810,8 @@ public ContainerReport createContainerReport() { @Override public String getNodeHttpAddress() { + readLock.lock(); try { - readLock.lock(); if (container.getNodeHttpAddress() != null) { StringBuilder httpAddress = new StringBuilder(); httpAddress.append(WebAppUtils.getHttpSchemePrefix(rmContext @@ -893,8 +894,8 @@ public boolean isRemotelyAllocated() { @Override public Resource getAllocatedOrReservedResource() { + readLock.lock(); try { - readLock.lock(); if (getState().equals(RMContainerState.RESERVED)) { return getReservedResource(); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d33ee44..0fb2619 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -669,8 +669,8 @@ public void resetLastNodeHeartBeatResponse() { public void handle(RMNodeEvent event) { LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); + writeLock.lock(); try { - writeLock.lock(); NodeState oldState = getState(); try { stateMachine.doTransition(event.getType(), event); @@ -1514,9 +1514,8 @@ private void handleLogAggregationStatus( @Override public List pullNewlyIncreasedContainers() { + writeLock.lock(); try { - writeLock.lock(); - if (nmReportedIncreasedContainers.isEmpty()) { return Collections.emptyList(); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java index 664cb35..ad3bfb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java @@ -118,8 +118,8 @@ protected Resource _get(String label, ResourceType type) { return normalize(noLabelUsages.resArr.get(type.idx)); } + readLock.lock(); try { - readLock.lock(); UsageByLabel usage = usages.get(label); if (null == usage) { return Resources.none(); @@ -131,8 +131,8 @@ protected Resource _get(String label, ResourceType type) { } protected Resource _getAll(ResourceType type) { + readLock.lock(); try { - readLock.lock(); Resource allOfType = Resources.createResource(0); for (Map.Entry usageEntry : usages.entrySet()) { // all usages types are initialized @@ -159,8 +159,8 @@ private UsageByLabel getAndAddIfMissing(String label) { } protected void _set(String label, ResourceType type, Resource res) { + writeLock.lock(); try { - writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); usage.resArr.set(type.idx, res); } finally { @@ -169,8 +169,8 @@ protected void _set(String label, ResourceType type, Resource res) { } protected void _inc(String label, ResourceType type, Resource res) { + writeLock.lock(); try { - writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); usage.resArr.set(type.idx, Resources.add(usage.resArr.get(type.idx), res)); @@ -180,8 +180,8 @@ protected void _inc(String label, ResourceType type, Resource res) { } protected void _dec(String label, ResourceType type, Resource res) { + writeLock.lock(); try { - writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); usage.resArr.set(type.idx, Resources.subtract(usage.resArr.get(type.idx), res)); @@ -192,8 +192,8 @@ protected void _dec(String label, ResourceType type, Resource res) { @Override public String toString() { + readLock.lock(); try { - readLock.lock(); return usages.toString(); } finally { readLock.unlock(); @@ -201,8 +201,8 @@ public String toString() { } public Set getNodePartitionsSet() { + readLock.lock(); try { - readLock.lock(); return usages.keySet(); } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index a798b97..aea3c64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -345,8 +345,8 @@ public long getLastNodeUpdateTime() { protected void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { + readLock.lock(); try { - readLock.lock(); // Get the application for the finished container SchedulerApplicationAttempt application = getCurrentAttemptForContainer(containerId); @@ -487,8 +487,8 @@ private void killOrphanContainerOnNode(RMNode node, public void recoverContainersOnNode(List containerReports, RMNode nm) { + writeLock.lock(); try { - writeLock.lock(); if (!rmContext.isWorkPreservingRecoveryEnabled() || containerReports == null || (containerReports != null && containerReports.isEmpty())) { @@ -769,8 +769,8 @@ public N getSchedulerNode(NodeId nodeId) { @Override public void moveAllApps(String sourceQueue, String destQueue) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); // check if destination queue is a valid leaf queue try { getQueueInfo(destQueue, false, false); @@ -800,8 +800,8 @@ public void moveAllApps(String sourceQueue, String destQueue) @Override public void killAllAppsInQueue(String queueName) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); // check if queue is a valid List apps = getAppsInQueue(queueName); if (apps == null) { @@ -826,8 +826,8 @@ public void killAllAppsInQueue(String queueName) */ public void updateNodeResource(RMNode nm, ResourceOption resourceOption) { + writeLock.lock(); try { - writeLock.lock(); SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); Resource oldResource = node.getTotalResource(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index ca7d9ce..8f8f3bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -134,8 +134,8 @@ public long getNewContainerId() { } public String getQueueName() { + this.readLock.lock(); try { - this.readLock.lock(); return queue.getQueueName(); } finally { this.readLock.unlock(); @@ -464,8 +464,8 @@ public boolean getAndResetBlacklistChanged() { */ public List getAllResourceRequests() { List ret = new ArrayList<>(); + this.readLock.lock(); try { - this.readLock.lock(); for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator .values()) { ret.addAll(ap.getResourceRequests().values()); @@ -482,8 +482,8 @@ public boolean getAndResetBlacklistChanged() { */ public List getAllSchedulingRequests() { List ret = new ArrayList<>(); + this.readLock.lock(); try { - this.readLock.lock(); schedulerKeyToAppPlacementAllocator.values().stream() .filter(ap -> ap.getSchedulingRequest() != null) .forEach(ap -> ret.add(ap.getSchedulingRequest())); @@ -494,8 +494,8 @@ public boolean getAndResetBlacklistChanged() { } public PendingAsk getNextPendingAsk() { + readLock.lock(); try { - readLock.lock(); SchedulerRequestKey firstRequestKey = schedulerKeys.first(); return getPendingAsk(firstRequestKey, ResourceRequest.ANY); } finally { @@ -510,8 +510,8 @@ public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) { public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey, String resourceName) { + this.readLock.lock(); try { - this.readLock.lock(); AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( schedulerKey); return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName); @@ -546,9 +546,8 @@ public boolean isPlaceBlacklisted(String resourceName, public ContainerRequest allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, Container containerAllocated) { + writeLock.lock(); try { - writeLock.lock(); - if (null != containerAllocated) { updateMetricsForAllocatedContainer(type, node, containerAllocated); } @@ -567,8 +566,8 @@ public void checkForDeactivation() { } public void move(Queue newQueue) { + this.writeLock.lock(); try { - this.writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator @@ -606,8 +605,8 @@ public void move(Queue newQueue) { public void stop() { // clear pending resources metrics for the application + this.writeLock.lock(); try { - this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator .values()) { @@ -633,8 +632,8 @@ public void stop() { } public void setQueue(Queue queue) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.queue = queue; } finally { this.writeLock.unlock(); @@ -662,8 +661,8 @@ public void recoverContainer(RMContainer rmContainer, String partition) { if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { return; } + this.writeLock.lock(); try { - this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); if (pending) { // If there was any container to recover, the application was @@ -690,8 +689,8 @@ public void recoverContainer(RMContainer rmContainer, String partition) { */ public boolean checkAllocation(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey) { + readLock.lock(); try { - readLock.lock(); AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( schedulerKey); if (null == ap) { @@ -751,8 +750,8 @@ public static void updateMetrics(ApplicationId applicationId, NodeType type, */ public boolean canDelayTo( SchedulerRequestKey schedulerKey, String resourceName) { + this.readLock.lock(); try { - this.readLock.lock(); AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(schedulerKey); return (ap == null) || ap.canDelayTo(resourceName); @@ -772,8 +771,8 @@ public boolean canDelayTo( */ public boolean precheckNode(SchedulerRequestKey schedulerKey, SchedulerNode schedulerNode, SchedulingMode schedulingMode) { + this.readLock.lock(); try { - this.readLock.lock(); AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(schedulerKey); return (ap != null) && ap.precheckNode(schedulerNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 37958de..c46c911 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -73,8 +73,8 @@ public void setUsed(Resource res) { } public void copyAllUsed(AbstractResourceUsage other) { + writeLock.lock(); try { - writeLock.lock(); for (Entry entry : other.usages.entrySet()) { setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed())); } @@ -285,8 +285,8 @@ public void setUserAMLimit(String label, Resource res) { } public Resource getCachedDemand(String label) { + readLock.lock(); try { - readLock.lock(); Resource demand = Resources.createResource(0); Resources.addTo(demand, getCachedUsed(label)); Resources.addTo(demand, getCachedPending(label)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 0439a3f..1135f7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -253,8 +253,8 @@ public void setOpportunisticContainerContext( * @return live containers of the application */ public Collection getLiveContainers() { + readLock.lock(); try { - readLock.lock(); return new ArrayList<>(liveContainers.values()); } finally { readLock.unlock(); @@ -307,8 +307,8 @@ public long getNewContainerId() { public PendingAsk getPendingAsk( SchedulerRequestKey schedulerKey, String resourceName) { + readLock.lock(); try { - readLock.lock(); return appSchedulingInfo.getPendingAsk(schedulerKey, resourceName); } finally { readLock.unlock(); @@ -321,8 +321,8 @@ public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey) { public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey, String resourceName) { + readLock.lock(); try { - readLock.lock(); AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator( schedulerKey); return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName); @@ -369,8 +369,8 @@ public RMContainer getRMContainer(ContainerId id) { public void addRMContainer( ContainerId id, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); if (!getApplicationAttemptId().equals( rmContainer.getApplicationAttemptId()) && !liveContainers.containsKey(id)) { @@ -393,8 +393,8 @@ public void addRMContainer( } public void removeRMContainer(ContainerId containerId) { + writeLock.lock(); try { - writeLock.lock(); RMContainer rmContainer = liveContainers.remove(containerId); if (rmContainer != null) { if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) { @@ -446,8 +446,8 @@ public Queue getQueue() { public boolean updateResourceRequests( List requests) { + writeLock.lock(); try { - writeLock.lock(); if (!isStopped) { return appSchedulingInfo.updateResourceRequests(requests, false); } @@ -463,8 +463,8 @@ public boolean updateSchedulingRequests( return false; } + writeLock.lock(); try { - writeLock.lock(); if (!isStopped) { return appSchedulingInfo.updateSchedulingRequests(requests, false); } @@ -476,8 +476,8 @@ public boolean updateSchedulingRequests( public void recoverResourceRequestsForContainer( ContainerRequest containerRequest) { + writeLock.lock(); try { - writeLock.lock(); if (!isStopped) { appSchedulingInfo.updateResourceRequests( containerRequest.getResourceRequests(), true); @@ -488,8 +488,8 @@ public void recoverResourceRequestsForContainer( } public void stop(RMAppAttemptState rmAppAttemptFinalState) { + writeLock.lock(); try { - writeLock.lock(); // Cleanup all scheduling information isStopped = true; appSchedulingInfo.stop(); @@ -508,8 +508,8 @@ public boolean isStopped() { */ public List getReservedContainers() { List list = new ArrayList<>(); + readLock.lock(); try { - readLock.lock(); for (Entry> e : this.reservedContainers.entrySet()) { list.addAll(e.getValue().values()); @@ -524,8 +524,8 @@ public boolean isStopped() { public boolean reserveIncreasedContainer(SchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer rmContainer, Resource reservedResource) { + writeLock.lock(); try { - writeLock.lock(); if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) { attemptResourceUsage.incReserved(node.getPartition(), reservedResource); // succeeded @@ -573,8 +573,8 @@ private boolean commonReserve(SchedulerNode node, public RMContainer reserve(SchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer rmContainer, Container container) { + writeLock.lock(); try { - writeLock.lock(); // Create RMContainer if necessary if (rmContainer == null) { rmContainer = new RMContainerImpl(container, schedulerKey, @@ -617,8 +617,8 @@ public Resource getHeadroom() { public int getNumReservedContainers( SchedulerRequestKey schedulerKey) { + readLock.lock(); try { - readLock.lock(); Map map = this.reservedContainers.get( schedulerKey); return (map == null) ? 0 : map.size(); @@ -630,8 +630,8 @@ public int getNumReservedContainers( @SuppressWarnings("unchecked") public void containerLaunchedOnNode(ContainerId containerId, NodeId nodeId) { + writeLock.lock(); try { - writeLock.lock(); // Inform the container RMContainer rmContainer = getRMContainer(containerId); if (rmContainer == null) { @@ -650,8 +650,8 @@ public void containerLaunchedOnNode(ContainerId containerId, public void showRequests() { if (LOG.isDebugEnabled()) { + readLock.lock(); try { - readLock.lock(); for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey); if (ap != null && @@ -762,8 +762,8 @@ private void updateNMToken(Container container) { * . */ List pullContainersToTransfer() { + writeLock.lock(); try { - writeLock.lock(); recoveredPreviousAttemptContainers.clear(); return new ArrayList<>(liveContainers.values()); } finally { @@ -777,8 +777,8 @@ private void updateNMToken(Container container) { * AllocateResponse#containersFromPreviousAttempts. */ public List pullPreviousAttemptContainers() { + writeLock.lock(); try { - writeLock.lock(); if (recoveredPreviousAttemptContainers.isEmpty()) { return null; } @@ -796,8 +796,8 @@ private void updateNMToken(Container container) { // some reason like DNS unavailable, do not return this container and keep it // in the newlyAllocatedContainers waiting to be refetched. public List pullNewlyAllocatedContainers() { + writeLock.lock(); try { - writeLock.lock(); List returnContainerList = new ArrayList( newlyAllocatedContainers.size()); @@ -912,8 +912,9 @@ protected synchronized void addToNewlyAllocatedContainers( || ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) { return updatedContainers; } + + writeLock.lock(); try { - writeLock.lock(); Iterator> i = newlyUpdatedContainers.entrySet().iterator(); while (i.hasNext()) { @@ -960,8 +961,8 @@ protected synchronized void addToNewlyAllocatedContainers( } public List pullUpdatedNMTokens() { + writeLock.lock(); try { - writeLock.lock(); List returnList = new ArrayList<>(updatedNMTokens); updatedNMTokens.clear(); return returnList; @@ -979,8 +980,8 @@ public boolean isWaitingForAMContainer() { public void updateBlacklist(List blacklistAdditions, List blacklistRemovals) { + writeLock.lock(); try { - writeLock.lock(); if (!isStopped) { if (isWaitingForAMContainer()) { // The request is for the AM-container, and the AM-container is @@ -999,8 +1000,8 @@ public void updateBlacklist(List blacklistAdditions, } public boolean isPlaceBlacklisted(String resourceName) { + readLock.lock(); try { - readLock.lock(); boolean forAMContainer = isWaitingForAMContainer(); return this.appSchedulingInfo.isPlaceBlacklisted(resourceName, forAMContainer); @@ -1103,8 +1104,8 @@ private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { } public ApplicationResourceUsageReport getResourceUsageReport() { + writeLock.lock(); try { - writeLock.lock(); AggregateAppResourceUsage runningResourceUsage = getRunningAggregateAppResourceUsage(); Resource usedResourceClone = Resources.clone( @@ -1154,8 +1155,8 @@ public ApplicationResourceUsageReport getResourceUsageReport() { public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { + writeLock.lock(); try { - writeLock.lock(); this.liveContainers = appAttempt.getLiveContainersMap(); // this.reReservations = appAttempt.reReservations; this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage); @@ -1172,8 +1173,8 @@ public void transferStateFromPreviousAttempt( } public void move(Queue newQueue) { + writeLock.lock(); try { - writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); String newQueueName = newQueue.getQueueName(); @@ -1209,8 +1210,8 @@ public void move(Queue newQueue) { public void recoverContainer(SchedulerNode node, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); // recover app scheduling info appSchedulingInfo.recoverContainer(rmContainer, node.getPartition()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java index ac97d72..f351119 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java @@ -77,8 +77,8 @@ public void setEntitlement(QueueEntitlement entitlement) */ public void setEntitlement(String nodeLabel, QueueEntitlement entitlement) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); float capacity = entitlement.getCapacity(); if (capacity < 0 || capacity > 1.0f) { throw new SchedulerDynamicEditException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index caa88cf..44c6b24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -278,8 +278,8 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { * @param maximumCapacity new max capacity */ void setMaxCapacity(float maximumCapacity) { + writeLock.lock(); try { - writeLock.lock(); // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), queueCapacities.getCapacity(), maximumCapacity); @@ -300,8 +300,8 @@ void setMaxCapacity(float maximumCapacity) { * @param maximumCapacity new max capacity */ void setMaxCapacity(String nodeLabel, float maximumCapacity) { + writeLock.lock(); try { - writeLock.lock(); // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), queueCapacities.getCapacity(nodeLabel), maximumCapacity); @@ -332,8 +332,8 @@ protected void setupQueueConfigs(Resource clusterResource, CapacitySchedulerConfiguration configuration) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // get labels this.accessibleLabels = configuration.getAccessibleNodeLabels(getQueuePath()); @@ -749,8 +749,8 @@ public Resource getMinimumAllocation() { void allocateResource(Resource clusterResource, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); queueUsage.incUsed(nodePartition, resource); ++numContainers; @@ -764,8 +764,8 @@ void allocateResource(Resource clusterResource, protected void releaseResource(Resource clusterResource, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); queueUsage.decUsed(nodePartition, resource); CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, @@ -784,8 +784,8 @@ public boolean getReservationContinueLooking() { @Private public Map getACLs() { + readLock.lock(); try { - readLock.lock(); return acls; } finally { readLock.unlock(); @@ -937,8 +937,8 @@ public boolean hasChildQueues() { boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { + readLock.lock(); try { - readLock.lock(); // Get current limited resource: // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect // queues' max capacity. @@ -1202,9 +1202,8 @@ public boolean accept(Resource cluster, Resource netAllocated = Resources.subtract(required, request.getTotalReleasedResource()); + readLock.lock(); try { - readLock.lock(); - String partition = schedulerContainer.getNodePartition(); Resource maxResourceLimit; if (allocation.getSchedulingMode() @@ -1253,8 +1252,8 @@ public void updateQueueState(QueueState queueState) { @Override public void activeQueue() throws YarnException { + this.writeLock.lock(); try { - this.writeLock.lock(); if (getState() == QueueState.RUNNING) { LOG.info("The specified queue:" + queueName + " is already in the RUNNING state."); @@ -1277,8 +1276,8 @@ public void activeQueue() throws YarnException { } protected void appFinished() { + this.writeLock.lock(); try { - this.writeLock.lock(); if (getState() == QueueState.DRAINING) { if (getNumApplications() == 0) { updateQueueState(QueueState.STOPPED); @@ -1300,8 +1299,8 @@ public Priority getPriority() { } public void recoverDrainingState() { + this.writeLock.lock(); try { - this.writeLock.lock(); if (getState() == QueueState.STOPPED) { updateQueueState(QueueState.DRAINING); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index 9d38f79..eb3221e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -54,9 +54,8 @@ public AbstractManagedParentQueue(CapacitySchedulerContext cs, @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); - // Set new configs setupQueueConfigs(clusterResource); @@ -72,8 +71,8 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) */ public void addChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException, IOException { + writeLock.lock(); try { - writeLock.lock(); if (childQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException( "Queue " + childQueue + " being added has non zero capacity."); @@ -95,8 +94,8 @@ public void addChildQueue(CSQueue childQueue) */ public void removeChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); if (childQueue.getCapacity() > 0) { throw new SchedulerDynamicEditException( "Queue " + childQueue + " being removed has non zero capacity."); @@ -124,8 +123,8 @@ public void removeChildQueue(CSQueue childQueue) public CSQueue removeChildQueue(String childQueueName) throws SchedulerDynamicEditException { CSQueue childQueue; + writeLock.lock(); try { - writeLock.lock(); childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue( childQueueName); if (childQueue != null) { @@ -141,8 +140,8 @@ public CSQueue removeChildQueue(String childQueueName) } protected float sumOfChildCapacities() { + writeLock.lock(); try { - writeLock.lock(); float ret = 0; for (CSQueue l : childQueues) { ret += l.getCapacity(); @@ -154,8 +153,8 @@ protected float sumOfChildCapacities() { } protected float sumOfChildAbsCapacities() { + writeLock.lock(); try { - writeLock.lock(); float ret = 0; for (CSQueue l : childQueues) { ret += l.getAbsoluteCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index e12b55e..b194ad8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -49,9 +49,8 @@ public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); - validate(newlyParsedQueue); ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent; @@ -72,8 +71,8 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig leafQueueTemplate) throws SchedulerDynamicEditException, IOException { + writeLock.lock(); try { - writeLock.lock(); // TODO: // reinitialize only capacities for now since 0 capacity updates diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 8ecc2f0..6832a11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -317,8 +317,8 @@ public void setRMContext(RMContext rmContext) { @VisibleForTesting void initScheduler(Configuration configuration) throws IOException { + writeLock.lock(); try { - writeLock.lock(); String confProviderStr = configuration.get( YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, YarnConfiguration.DEFAULT_CONFIGURATION_STORE); @@ -416,8 +416,8 @@ void initScheduler(Configuration configuration) throws } private void startSchedulerThreads() { + writeLock.lock(); try { - writeLock.lock(); activitiesManager.start(); if (scheduleAsynchronously) { Preconditions.checkNotNull(asyncSchedulerThreads, @@ -450,8 +450,8 @@ public void serviceStart() throws Exception { @Override public void serviceStop() throws Exception { + writeLock.lock(); try { - writeLock.lock(); this.activitiesManager.stop(); if (scheduleAsynchronously && asyncSchedulerThreads != null) { for (Thread t : asyncSchedulerThreads) { @@ -474,8 +474,8 @@ public void serviceStop() throws Exception { @Override public void reinitialize(Configuration newConf, RMContext rmContext) throws IOException { + writeLock.lock(); try { - writeLock.lock(); Configuration configuration = new Configuration(newConf); CapacitySchedulerConfiguration oldConf = this.conf; this.conf = csConfProvider.loadConfiguration(configuration); @@ -651,9 +651,8 @@ public void run() { try { ResourceCommitRequest request = backlogs.take(); - + cs.writeLock.lock(); try { - cs.writeLock.lock(); cs.tryCommit(cs.getClusterResource(), request, true); } finally { cs.writeLock.unlock(); @@ -679,8 +678,8 @@ public int getPendingBacklogs() { @VisibleForTesting public PlacementRule getUserGroupMappingPlacementRule() throws IOException { + readLock.lock(); try { - readLock.lock(); UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(); ugRule.initialize(this); return ugRule; @@ -690,8 +689,8 @@ public PlacementRule getUserGroupMappingPlacementRule() throws IOException { } public PlacementRule getAppNameMappingPlacementRule() throws IOException { + readLock.lock(); try { - readLock.lock(); AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(); anRule.initialize(this); return anRule; @@ -791,8 +790,8 @@ public CSQueue getQueue(String queueName) { private void addApplicationOnRecovery(ApplicationId applicationId, String queueName, String user, Priority priority, ApplicationPlacementContext placementContext) { + writeLock.lock(); try { - writeLock.lock(); //check if the queue needs to be auto-created during recovery CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user, queueName, placementContext, true); @@ -915,8 +914,8 @@ private CSQueue getOrCreateQueueFromPlacementContext(ApplicationId private void addApplication(ApplicationId applicationId, String queueName, String user, Priority priority, ApplicationPlacementContext placementContext) { + writeLock.lock(); try { - writeLock.lock(); if (isSystemAppsLimitReached()) { String message = "Maximum system application limit reached," + "cannot accept submission of application: " + applicationId; @@ -1014,8 +1013,8 @@ private void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get( applicationAttemptId.getApplicationId()); if (application == null) { @@ -1067,8 +1066,8 @@ private void addApplicationAttempt( private void doneApplication(ApplicationId applicationId, RMAppState finalState) { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get( applicationId); if (application == null) { @@ -1094,8 +1093,8 @@ private void doneApplication(ApplicationId applicationId, private void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { + writeLock.lock(); try { - writeLock.lock(); LOG.info("Application Attempt " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); @@ -1209,8 +1208,8 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // make sure we aren't stopping/removing the application // when the allocate comes in + application.getWriteLock().lock(); try { - application.getWriteLock().lock(); if (application.isStopped()) { return EMPTY_ALLOCATION; } @@ -1287,8 +1286,8 @@ public QueueInfo getQueueInfo(String queueName, @Override protected void nodeUpdate(RMNode rmNode) { long begin = System.nanoTime(); + readLock.lock(); try { - readLock.lock(); setLastNodeUpdateTime(Time.now()); super.nodeUpdate(rmNode); } finally { @@ -1297,8 +1296,8 @@ protected void nodeUpdate(RMNode rmNode) { // Try to do scheduling if (!scheduleAsynchronously) { + writeLock.lock(); try { - writeLock.lock(); ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, rmNode.getNodeID()); @@ -1324,8 +1323,8 @@ protected void nodeUpdate(RMNode rmNode) { */ private void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { + writeLock.lock(); try { - writeLock.lock(); updateNodeResource(nm, resourceOption); Resource clusterResource = getClusterResource(); getRootQueue().updateClusterResource(clusterResource, @@ -1912,8 +1911,8 @@ public void handle(SchedulerEvent event) { private void updateNodeAttributes( NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { + writeLock.lock(); try { - writeLock.lock(); for (Entry> entry : attributeUpdateEvent .getUpdatedNodeToAttributes().entrySet()) { String hostname = entry.getKey(); @@ -1939,8 +1938,8 @@ private void updateAttributesOnNode(List nodeIds, */ private void updateNodeLabelsAndQueueResource( NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { + writeLock.lock(); try { - writeLock.lock(); Set updateLabels = new HashSet(); for (Entry> entry : labelUpdateEvent .getUpdatedNodeToLabels().entrySet()) { @@ -1977,8 +1976,8 @@ private void refreshLabelToNodeCache(Set updateLabels) { } private void addNode(RMNode nodeManager) { + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName, nodeManager.getNodeLabels()); nodeTracker.addNode(schedulerNode); @@ -2014,8 +2013,8 @@ private void addNode(RMNode nodeManager) { } private void removeNode(RMNode nodeInfo) { + writeLock.lock(); try { - writeLock.lock(); // update this node to node label manager if (labelManager != null) { labelManager.deactivateNode(nodeInfo.getNodeID()); @@ -2159,8 +2158,8 @@ public void killContainer(RMContainer container) { public void markContainerForKillable( RMContainer killableContainer) { + writeLock.lock(); try { - writeLock.lock(); if (LOG.isDebugEnabled()) { LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container" + killableContainer.toString()); @@ -2195,8 +2194,8 @@ public void markContainerForKillable( private void markContainerForNonKillable( RMContainer nonKillableContainer) { + writeLock.lock(); try { - writeLock.lock(); if (LOG.isDebugEnabled()) { LOG.debug( SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container" @@ -2264,8 +2263,8 @@ private String getDefaultReservationQueueName(String planQueueName) { private String resolveReservationQueueName(String queueName, ApplicationId applicationId, ReservationId reservationID, boolean isRecovering) { + readLock.lock(); try { - readLock.lock(); CSQueue queue = getQueue(queueName); // Check if the queue is a plan queue if ((queue == null) || !(queue instanceof PlanQueue)) { @@ -2315,8 +2314,8 @@ private String resolveReservationQueueName(String queueName, @Override public void removeQueue(String queueName) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); LOG.info("Removing queue: " + queueName); CSQueue q = this.getQueue(queueName); if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom( @@ -2349,8 +2348,8 @@ public void removeQueue(String queueName) @Override public void addQueue(Queue queue) throws SchedulerDynamicEditException, IOException { + writeLock.lock(); try { - writeLock.lock(); if (queue == null) { throw new SchedulerDynamicEditException( "Queue specified is null. Should be an implementation of " @@ -2387,8 +2386,8 @@ public void addQueue(Queue queue) @Override public void setEntitlement(String inQueue, QueueEntitlement entitlement) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue.getParent(); @@ -2424,8 +2423,8 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) @Override public String moveApplication(ApplicationId appId, String targetQueueName) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get(appId); if (application == null) { @@ -2476,8 +2475,8 @@ public String moveApplication(ApplicationId appId, @Override public void preValidateMoveApplication(ApplicationId appId, String newQueue) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); SchedulerApplication application = applications.get(appId); if (application == null) { @@ -2599,8 +2598,8 @@ private String handleMoveToPlanQueue(String targetQueueName) { public Priority checkAndGetApplicationPriority( Priority priorityRequestedByApp, UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException { + readLock.lock(); try { - readLock.lock(); Priority appPriority = priorityRequestedByApp; // Verify the scenario where priority is null from submissionContext. @@ -2655,8 +2654,8 @@ public Priority updateApplicationPriority(Priority newPriority, ApplicationId applicationId, SettableFuture future, UserGroupInformation user) throws YarnException { + writeLock.lock(); try { - writeLock.lock(); Priority appPriority = null; SchedulerApplication application = applications .get(applicationId); @@ -3060,9 +3059,8 @@ public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { */ public boolean moveReservedContainer(RMContainer toBeMovedContainer, FiCaSchedulerNode targetNode) { + writeLock.lock(); try { - writeLock.lock(); - if (LOG.isDebugEnabled()) { LOG.debug("Trying to move container=" + toBeMovedContainer + " to node=" + targetNode.getNodeID()); @@ -3116,8 +3114,8 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer, @Override public long checkAndGetApplicationLifetime(String queueName, long lifetimeRequestedByApp) { + readLock.lock(); try { - readLock.lock(); CSQueue queue = getQueue(queueName); if (queue == null || !(queue instanceof LeafQueue)) { return lifetimeRequestedByApp; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3920987..d2742bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -168,8 +168,8 @@ protected void setupQueueConfigs(Resource clusterResource) protected void setupQueueConfigs(Resource clusterResource, CapacitySchedulerConfiguration conf) throws IOException { + writeLock.lock(); try { - writeLock.lock(); CapacitySchedulerConfiguration schedConf = csContext.getConfiguration(); super.setupQueueConfigs(clusterResource, conf); @@ -401,8 +401,8 @@ void setUserLimitFactor(float userLimitFactor) { @Override public int getNumApplications() { + readLock.lock(); try { - readLock.lock(); return getNumPendingApplications() + getNumActiveApplications(); } finally { readLock.unlock(); @@ -410,8 +410,8 @@ public int getNumApplications() { } public int getNumPendingApplications() { + readLock.lock(); try { - readLock.lock(); return pendingOrderingPolicy.getNumSchedulableEntities(); } finally { readLock.unlock(); @@ -419,8 +419,8 @@ public int getNumPendingApplications() { } public int getNumActiveApplications() { + readLock.lock(); try { - readLock.lock(); return orderingPolicy.getNumSchedulableEntities(); } finally { readLock.unlock(); @@ -429,8 +429,8 @@ public int getNumActiveApplications() { @Private public int getNumPendingApplications(String user) { + readLock.lock(); try { - readLock.lock(); User u = getUser(user); if (null == u) { return 0; @@ -443,8 +443,8 @@ public int getNumPendingApplications(String user) { @Private public int getNumActiveApplications(String user) { + readLock.lock(); try { - readLock.lock(); User u = getUser(user); if (null == u) { return 0; @@ -475,8 +475,8 @@ public QueueInfo getQueueInfo( @Override public List getQueueUserAclInfo(UserGroupInformation user) { + readLock.lock(); try { - readLock.lock(); QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( QueueUserACLInfo.class); List operations = new ArrayList<>(); @@ -496,8 +496,8 @@ public QueueInfo getQueueInfo( } public String toString() { + readLock.lock(); try { - readLock.lock(); return queueName + ": " + "capacity=" + queueCapacities.getCapacity() + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + "usedResources=" + queueUsage.getUsed() + ", " @@ -521,8 +521,8 @@ public User getUser(String userName) { @Private public List getPriorityACLs() { + readLock.lock(); try { - readLock.lock(); return new ArrayList<>(priorityAcls); } finally { readLock.unlock(); @@ -534,8 +534,8 @@ protected void reinitialize( CapacitySchedulerConfiguration configuration) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -581,9 +581,8 @@ public void reinitialize( public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); - // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately User user = usersManager.getUserAndAddIfAbsent(userName); @@ -621,8 +620,8 @@ public void submitApplication(ApplicationId applicationId, String userName, public void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { + writeLock.lock(); try { - writeLock.lock(); // Check if the queue is accepting jobs if (getState() != QueueState.RUNNING) { String msg = "Queue " + getQueuePath() @@ -690,8 +689,9 @@ public Resource getUserAMResourceLimitPerPartition( if (userName != null && getUser(userName) != null) { userWeight = getUser(userName).getWeight(); } + + readLock.lock(); try { - readLock.lock(); /* * The user am resource limit is based on the same approach as the user * limit (as it should represent a subset of that). This means that it uses @@ -740,8 +740,8 @@ public Resource getUserAMResourceLimitPerPartition( public Resource calculateAndGetAMResourceLimitPerPartition( String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); /* * For non-labeled partition, get the max value from resources currently * available to the queue and the absolute resources guaranteed for the @@ -793,8 +793,8 @@ public Resource calculateAndGetAMResourceLimitPerPartition( } protected void activateApplications() { + writeLock.lock(); try { - writeLock.lock(); // limit of allowed resource usage for application masters Map userAmPartitionLimit = new HashMap(); @@ -915,8 +915,8 @@ protected void activateApplications() { private void addApplicationAttempt(FiCaSchedulerApp application, User user) { + writeLock.lock(); try { - writeLock.lock(); // Accept user.submitApplication(); getPendingAppsOrderingPolicy().addSchedulableEntity(application); @@ -968,9 +968,9 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) private void removeApplicationAttempt( FiCaSchedulerApp application, String userName) { - try { - writeLock.lock(); + writeLock.lock(); + try { // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately User user = usersManager.getUserAndAddIfAbsent(userName); @@ -1227,8 +1227,8 @@ public boolean accept(Resource cluster, // Do not check limits when allocation from a reserved container if (allocation.getAllocateFromReservedContainer() == null) { + readLock.lock(); try { - readLock.lock(); FiCaSchedulerApp app = schedulerContainer.getSchedulerApplicationAttempt(); String username = app.getUser(); @@ -1328,9 +1328,8 @@ public void apply(Resource cluster, releaseContainers(cluster, request); + writeLock.lock(); try { - writeLock.lock(); - if (request.anythingAllocatedOrReserved()) { ContainerAllocationProposal allocation = request.getFirstAllocatedOrReservedContainer(); @@ -1548,8 +1547,9 @@ public Resource getResourceLimitForAllUsers(String userName, protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, String nodePartition, ResourceLimits currentResourceLimits) { + + readLock.lock(); try { - readLock.lock(); User user = getUser(userName); if (user == null) { if (LOG.isDebugEnabled()) { @@ -1630,8 +1630,8 @@ private void updateSchedulerHealthForCompletedContainer( */ public void recalculateQueueUsageRatio(Resource clusterResource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); ResourceUsage queueResourceUsage = getQueueResourceUsage(); if (nodePartition == null) { @@ -1660,8 +1660,8 @@ public void completedContainer(Resource clusterResource, boolean removed = false; // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); Container container = rmContainer.getContainer(); // Inform the application & the node @@ -1713,8 +1713,8 @@ public void completedContainer(Resource clusterResource, void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, String nodePartition, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); super.allocateResource(clusterResource, resource, nodePartition); // handle ignore exclusivity container @@ -1758,8 +1758,8 @@ void allocateResource(Resource clusterResource, void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); super.releaseResource(clusterResource, resource, nodePartition); // handle ignore exclusivity container @@ -1814,8 +1814,8 @@ private void updateCurrentResourceLimits( @Override public void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { + writeLock.lock(); try { - writeLock.lock(); updateCurrentResourceLimits(currentResourceLimits, clusterResource); lastClusterResource = clusterResource; @@ -1897,8 +1897,8 @@ public void recoverContainer(Resource clusterResource, return; } // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerNode node = scheduler.getNode( rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, @@ -1961,8 +1961,8 @@ public void recoverContainer(Resource clusterResource, public Resource getTotalPendingResourcesConsideringUserLimit( Resource clusterResources, String partition, boolean deductReservedFromPending) { + readLock.lock(); try { - readLock.lock(); Map userNameToHeadroom = new HashMap<>(); Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0); @@ -2005,8 +2005,8 @@ public Resource getTotalPendingResourcesConsideringUserLimit( @Override public void collectSchedulerApplications( Collection apps) { + readLock.lock(); try { - readLock.lock(); for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); @@ -2065,9 +2065,9 @@ public void detachContainer(Resource clusterResource, public Map> getIgnoreExclusivityRMContainers() { Map> clonedMap = new HashMap<>(); - try { - readLock.lock(); + readLock.lock(); + try { for (Map.Entry> entry : ignorePartitionExclusivityRMContainers .entrySet()) { clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue())); @@ -2116,8 +2116,8 @@ public void setMaxAMResourcePerQueuePercent( void setOrderingPolicy( OrderingPolicy orderingPolicy) { + writeLock.lock(); try { - writeLock.lock(); if (null != this.orderingPolicy) { orderingPolicy.addAllSchedulableEntities( this.orderingPolicy.getSchedulableEntities()); @@ -2135,8 +2135,8 @@ public Priority getDefaultApplicationPriority() { public void updateApplicationPriority(SchedulerApplication app, Priority newAppPriority) { + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerApp attempt = app.getCurrentAppAttempt(); boolean isActive = orderingPolicy.removeSchedulableEntity(attempt); if (!isActive) { @@ -2187,8 +2187,8 @@ public Resource getClusterResource() { @Override public void stopQueue() { + writeLock.lock(); try { - writeLock.lock(); if (getNumApplications() > 0) { updateQueueState(QueueState.DRAINING); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index 6788bb4..6c40a23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -74,8 +74,8 @@ public ManagedParentQueue(final CapacitySchedulerContext cs, public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); validate(newlyParsedQueue); shouldFailAutoCreationWhenGuaranteedCapacityExceeded = @@ -184,9 +184,9 @@ protected void validate(final CSQueue newlyParsedQueue) throws IOException { @Override public void addChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException, IOException { - try { - writeLock.lock(); + writeLock.lock(); + try { if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( "Expected child queue to be an instance of AutoCreatedLeafQueue"); @@ -231,8 +231,8 @@ public void addChildQueue(CSQueue childQueue) } public List getScheduleableApplications() { + readLock.lock(); try { - readLock.lock(); List apps = new ArrayList<>(); for (CSQueue childQueue : getChildQueues()) { apps.addAll(((LeafQueue) childQueue).getApplications()); @@ -244,8 +244,8 @@ public void addChildQueue(CSQueue childQueue) } public List getPendingApplications() { + readLock.lock(); try { - readLock.lock(); List apps = new ArrayList<>(); for (CSQueue childQueue : getChildQueues()) { apps.addAll(((LeafQueue) childQueue).getPendingApplications()); @@ -257,8 +257,8 @@ public void addChildQueue(CSQueue childQueue) } public List getAllApplications() { + readLock.lock(); try { - readLock.lock(); List apps = new ArrayList<>(); for (CSQueue childQueue : getChildQueues()) { apps.addAll(((LeafQueue) childQueue).getAllApplications()); @@ -286,9 +286,9 @@ public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() { public void validateAndApplyQueueManagementChanges( List queueManagementChanges) throws IOException, SchedulerDynamicEditException { - try { - writeLock.lock(); + writeLock.lock(); + try { validateQueueManagementChanges(queueManagementChanges); applyQueueManagementChanges(queueManagementChanges); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6d3794e..74734f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -125,8 +125,8 @@ private String getQueueOrderingPolicyConfigName() { protected void setupQueueConfigs(Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); super.setupQueueConfigs(clusterResource); StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { @@ -165,8 +165,8 @@ protected void setupQueueConfigs(Resource clusterResource) private static float PRECISION = 0.0005f; // 0.05% precision void setChildQueues(Collection childQueues) { + writeLock.lock(); try { - writeLock.lock(); // Validate float childCapacities = 0; Resource minResDefaultLabel = Resources.createResource(0, 0); @@ -256,8 +256,8 @@ void setChildQueues(Collection childQueues) { @Override public QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { + readLock.lock(); try { - readLock.lock(); QueueInfo queueInfo = getQueueInfo(); List childQueuesInfo = new ArrayList<>(); @@ -278,8 +278,8 @@ public QueueInfo getQueueInfo( private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { + readLock.lock(); try { - readLock.lock(); QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( QueueUserACLInfo.class); List operations = new ArrayList(); @@ -301,8 +301,8 @@ private QueueUserACLInfo getUserAclInfo( @Override public List getQueueUserAclInfo( UserGroupInformation user) { + readLock.lock(); try { - readLock.lock(); List userAcls = new ArrayList<>(); // Add parent queue acls @@ -334,8 +334,8 @@ public String toString() { @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -429,9 +429,8 @@ public void reinitialize(CSQueue newlyParsedQueue, @Override public void submitApplication(ApplicationId applicationId, String user, String queue) throws AccessControlException { - + writeLock.lock(); try { - writeLock.lock(); // Sanity check validateSubmitApplication(applicationId, user, queue); @@ -455,8 +454,8 @@ public void submitApplication(ApplicationId applicationId, String user, public void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { + writeLock.lock(); try { - writeLock.lock(); if (queue.equals(queueName)) { throw new AccessControlException( "Cannot submit application " + "to non-leaf queue: " + queueName); @@ -486,9 +485,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, private void addApplication(ApplicationId applicationId, String user) { - + writeLock.lock(); try { - writeLock.lock(); ++numApplications; LOG.info( @@ -515,8 +513,8 @@ public void finishApplication(ApplicationId application, String user) { private void removeApplication(ApplicationId applicationId, String user) { + writeLock.lock(); try { - writeLock.lock(); --numApplications; LOG.info("Application removed -" + " appId: " + applicationId + " user: " @@ -853,8 +851,8 @@ private void printChildQueues() { private void internalReleaseResource(Resource clusterResource, FiCaSchedulerNode node, Resource releasedResource) { + writeLock.lock(); try { - writeLock.lock(); super.releaseResource(clusterResource, releasedResource, node.getPartition()); @@ -890,9 +888,8 @@ public void completedContainer(Resource clusterResource, @Override public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { + writeLock.lock(); try { - writeLock.lock(); - // Update effective capacity in all parent queue. Set configuredNodelabels = csContext.getConfiguration() .getConfiguredNodeLabels(getQueuePath()); @@ -1132,8 +1129,8 @@ private void deriveCapacityFromAbsoluteConfigurations(String label, @Override public List getChildQueues() { + readLock.lock(); try { - readLock.lock(); return new ArrayList(childQueues); } finally { readLock.unlock(); @@ -1152,8 +1149,8 @@ public void recoverContainer(Resource clusterResource, } // Careful! Locking order is important! + writeLock.lock(); try { - writeLock.lock(); FiCaSchedulerNode node = scheduler.getNode( rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, @@ -1176,8 +1173,8 @@ public ActiveUsersManager getAbstractUsersManager() { @Override public void collectSchedulerApplications( Collection apps) { + readLock.lock(); try { - readLock.lock(); for (CSQueue queue : childQueues) { queue.collectSchedulerApplications(apps); } @@ -1232,8 +1229,8 @@ public int getNumApplications() { void allocateResource(Resource clusterResource, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); super.allocateResource(clusterResource, resource, nodePartition); /** @@ -1330,8 +1327,8 @@ public void apply(Resource cluster, // Do not modify queue when allocation from reserved container if (allocation.getAllocateFromReservedContainer() == null) { + writeLock.lock(); try { - writeLock.lock(); // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(cluster, allocation.getAllocatedOrReservedResource(), @@ -1354,8 +1351,8 @@ public void apply(Resource cluster, @Override public void stopQueue() { + this.writeLock.lock(); try { - this.writeLock.lock(); if (getNumApplications() > 0) { updateQueueState(QueueState.DRAINING); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 757002f..79afcdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -81,8 +81,8 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName, @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index 51b9fa8..c1b7157 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -83,8 +83,8 @@ public String toString() { } private float _get(String label, CapacityType type) { + readLock.lock(); try { - readLock.lock(); Capacities cap = capacitiesMap.get(label); if (null == cap) { return LABEL_DOESNT_EXIST_CAP; @@ -96,8 +96,8 @@ private float _get(String label, CapacityType type) { } private void _set(String label, CapacityType type, float value) { + writeLock.lock(); try { - writeLock.lock(); Capacities cap = capacitiesMap.get(label); if (null == cap) { cap = new Capacities(); @@ -277,8 +277,8 @@ public void setAbsoluteReservedCapacity(String label, float value) { * configurable fields, and load new values */ public void clearConfigurableFields() { + writeLock.lock(); try { - writeLock.lock(); for (String label : capacitiesMap.keySet()) { _set(label, CapacityType.CAP, 0); _set(label, CapacityType.MAX_CAP, 0); @@ -291,8 +291,8 @@ public void clearConfigurableFields() { } public Set getExistingNodeLabels() { + readLock.lock(); try { - readLock.lock(); return new HashSet(capacitiesMap.keySet()); } finally { readLock.unlock(); @@ -301,8 +301,8 @@ public void clearConfigurableFields() { @Override public String toString() { + readLock.lock(); try { - readLock.lock(); return this.capacitiesMap.toString(); } finally { readLock.unlock(); @@ -310,8 +310,8 @@ public String toString() { } public Set getNodePartitionsSet() { + readLock.lock(); try { - readLock.lock(); return capacitiesMap.keySet(); } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 34f4aa1..d59c02b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -53,8 +53,8 @@ public ReservationQueue(CapacitySchedulerContext cs, String queueName, @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + writeLock.lock(); try { - writeLock.lock(); // Sanity check if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 83ee6c0..19ed0d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -113,8 +113,8 @@ public UsageRatios() { } private void incUsageRatio(String label, float delta) { + writeLock.lock(); try { - writeLock.lock(); float usage = 0f; if (usageRatios.containsKey(label)) { usage = usageRatios.get(label); @@ -127,8 +127,8 @@ private void incUsageRatio(String label, float delta) { } private float getUsageRatio(String label) { + readLock.lock(); try { - readLock.lock(); Float f = usageRatios.get(label); if (null == f) { return 0.0f; @@ -140,8 +140,8 @@ private float getUsageRatio(String label) { } private void setUsageRatio(String label, float ratio) { + writeLock.lock(); try { - writeLock.lock(); usageRatios.put(label, ratio); } finally { writeLock.unlock(); @@ -178,8 +178,8 @@ public ResourceUsage getResourceUsage() { public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); userUsageRatios.setUsageRatio(nodePartition, 0); return updateUsageRatio(resourceCalculator, resource, nodePartition); } finally { @@ -189,8 +189,8 @@ public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator, public float updateUsageRatio(ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { + writeLock.lock(); try { - writeLock.lock(); float delta; float newRatio = Resources.ratio(resourceCalculator, getUsed(nodePartition), resource); @@ -357,8 +357,8 @@ public void userLimitNeedsRecompute() { // If latestVersionOfUsersState is negative due to overflow, ideally we need // to reset it. This method is invoked from UsersManager and LeafQueue and // all is happening within write/readLock. Below logic can help to set 0. + writeLock.lock(); try { - writeLock.lock(); long value = latestVersionOfUsersState.incrementAndGet(); if (value < 0) { @@ -394,8 +394,8 @@ public User getUser(String userName) { * User Name */ public void removeUser(String userName) { + writeLock.lock(); try { - writeLock.lock(); this.users.remove(userName); // Remove user from active/non-active list as well. @@ -416,8 +416,8 @@ public void removeUser(String userName) { * @return User object */ public User getUserAndAddIfAbsent(String userName) { + writeLock.lock(); try { - writeLock.lock(); User u = getUser(userName); if (null == u) { u = new User(userName); @@ -447,8 +447,8 @@ private void addUser(String userName, User user) { * @return an ArrayList of UserInfo objects who are active in this queue */ public ArrayList getUsersInfo() { + readLock.lock(); try { - readLock.lock(); ArrayList usersToReturn = new ArrayList(); for (Map.Entry entry : getUsers().entrySet()) { User user = entry.getValue(); @@ -493,8 +493,8 @@ public Resource getComputedResourceLimitForActiveUsers(String userName, Map userLimitPerSchedulingMode; + writeLock.lock(); try { - writeLock.lock(); userLimitPerSchedulingMode = preComputedActiveUserLimit.get(nodePartition); if (isRecomputeNeeded(schedulingMode, nodePartition, true)) { @@ -552,8 +552,8 @@ public Resource getComputedResourceLimitForAllUsers(String userName, Map userLimitPerSchedulingMode; + writeLock.lock(); try { - writeLock.lock(); userLimitPerSchedulingMode = preComputedAllUserLimit.get(nodePartition); if (isRecomputeNeeded(schedulingMode, nodePartition, false)) { // recompute @@ -601,8 +601,8 @@ private boolean isRecomputeNeeded(SchedulingMode schedulingMode, */ private void setLocalVersionOfUsersState(String nodePartition, SchedulingMode schedulingMode, boolean isActive) { + writeLock.lock(); try { - writeLock.lock(); Map> localVersionOfUsersState = (isActive) ? localVersionOfActiveUsersState : localVersionOfAllUsersState; @@ -625,8 +625,8 @@ private void setLocalVersionOfUsersState(String nodePartition, */ private long getLocalVersionOfUsersState(String nodePartition, SchedulingMode schedulingMode, boolean isActive) { + this.readLock.lock(); try { - this.readLock.lock(); Map> localVersionOfUsersState = (isActive) ? localVersionOfActiveUsersState : localVersionOfAllUsersState; @@ -824,8 +824,8 @@ private Resource computeUserLimit(String userName, Resource clusterResource, * Cluster Resource */ public void updateUsageRatio(String partition, Resource clusterResource) { + writeLock.lock(); try { - writeLock.lock(); Resource resourceByLabel = labelManager.getResourceByLabel(partition, clusterResource); float consumed = 0; @@ -851,9 +851,9 @@ private void incQueueUsageRatio(String nodePartition, float delta) { @Override public void activateApplication(String user, ApplicationId applicationId) { - try { - this.writeLock.lock(); + this.writeLock.lock(); + try { User userDesc = getUser(user); if (userDesc != null && userDesc.getActiveApplications() <= 0) { return; @@ -884,9 +884,9 @@ public void activateApplication(String user, ApplicationId applicationId) { @Override public void deactivateApplication(String user, ApplicationId applicationId) { - try { - this.writeLock.lock(); + this.writeLock.lock(); + try { Set userApps = usersApplications.get(user); if (userApps != null) { if (userApps.remove(applicationId)) { @@ -918,8 +918,8 @@ public int getNumActiveUsers() { float sumActiveUsersTimesWeights() { float count = 0.0f; + this.readLock.lock(); try { - this.readLock.lock(); for (String u : activeUsersSet) { count += getUser(u).getWeight(); } @@ -931,8 +931,8 @@ float sumActiveUsersTimesWeights() { float sumAllUsersTimesWeights() { float count = 0.0f; + this.readLock.lock(); try { - this.readLock.lock(); for (String u : users.keySet()) { count += getUser(u).getWeight(); } @@ -943,9 +943,8 @@ float sumAllUsersTimesWeights() { } private void updateActiveUsersResourceUsage(String userName) { + this.writeLock.lock(); try { - this.writeLock.lock(); - // For UT case: We might need to add the user to users list. User user = getUserAndAddIfAbsent(userName); ResourceUsage resourceUsage = user.getResourceUsage(); @@ -982,8 +981,8 @@ private void updateActiveUsersResourceUsage(String userName) { } private void updateNonActiveUsersResourceUsage(String userName) { + this.writeLock.lock(); try { - this.writeLock.lock(); // For UT case: We might need to add the user to users list. User user = getUser(userName); @@ -1051,8 +1050,8 @@ private ResourceUsage getTotalResourceUsagePerUser(String userName) { */ public User updateUserResourceUsage(String userName, Resource resource, String nodePartition, boolean isAllocate) { + this.writeLock.lock(); try { - this.writeLock.lock(); // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately @@ -1098,8 +1097,8 @@ private void updateResourceUsagePerUser(User user, Resource resource, } public void updateUserWeights() { + this.writeLock.lock(); try { - this.writeLock.lock(); for (Map.Entry ue : users.entrySet()) { ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java index 76fcd4a..ae07087 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -43,8 +43,8 @@ public PreemptionManager() { } public void refreshQueues(CSQueue parent, CSQueue current) { + writeLock.lock(); try { - writeLock.lock(); PreemptableQueue parentEntity = null; if (parent != null) { parentEntity = entities.get(parent.getQueueName()); @@ -67,8 +67,8 @@ public void refreshQueues(CSQueue parent, CSQueue current) { } public void addKillableContainer(KillableContainer container) { + writeLock.lock(); try { - writeLock.lock(); PreemptableQueue entity = entities.get(container.getLeafQueueName()); if (null != entity) { entity.addKillableContainer(container); @@ -80,8 +80,8 @@ public void addKillableContainer(KillableContainer container) { } public void removeKillableContainer(KillableContainer container) { + writeLock.lock(); try { - writeLock.lock(); PreemptableQueue entity = entities.get(container.getLeafQueueName()); if (null != entity) { entity.removeKillableContainer(container); @@ -106,8 +106,8 @@ public void updateKillableContainerResource(KillableContainer container, @VisibleForTesting public Map getKillableContainersMap( String queueName, String partition) { + readLock.lock(); try { - readLock.lock(); PreemptableQueue entity = entities.get(queueName); if (entity != null) { Map containers = @@ -129,8 +129,8 @@ public void updateKillableContainerResource(KillableContainer container, } public Resource getKillableResource(String queueName, String partition) { + readLock.lock(); try { - readLock.lock(); PreemptableQueue entity = entities.get(queueName); if (entity != null) { Resource res = entity.getTotalKillableResources().get(partition); @@ -147,8 +147,8 @@ public Resource getKillableResource(String queueName, String partition) { } public Map getShallowCopyOfPreemptableQueues() { + readLock.lock(); try { - readLock.lock(); Map map = new HashMap<>(); for (Map.Entry entry : entities.entrySet()) { String key = entry.getKey(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index faa6e6f..b1d3f74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -202,8 +202,8 @@ private boolean deactivate() { new HashMap(); private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { + readLock.lock(); try { - readLock.lock(); Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel( nodeLabel); if (totalActivatedCapacity != null) { @@ -218,8 +218,8 @@ private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { private void incAbsoluteActivatedChildCapacity(String nodeLabel, float childQueueCapacity) { + writeLock.lock(); try { - writeLock.lock(); Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel( nodeLabel); if (activatedChildCapacity != null) { @@ -236,8 +236,8 @@ private void incAbsoluteActivatedChildCapacity(String nodeLabel, private void decAbsoluteActivatedChildCapacity(String nodeLabel, float childQueueCapacity) { + writeLock.lock(); try { - writeLock.lock(); Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel( nodeLabel); if (activatedChildCapacity != null) { @@ -360,8 +360,8 @@ private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) //synch/add missing leaf queue(s) if any to state updateLeafQueueState(); + readLock.lock(); try { - readLock.lock(); List queueManagementChanges = new ArrayList<>(); List pendingApps = getSortedPendingApplications(); @@ -483,8 +483,8 @@ private float getTotalDeactivatedCapacity( @VisibleForTesting void updateLeafQueueState() { + writeLock.lock(); try { - writeLock.lock(); Set newPartitions = new HashSet<>(); Set newQueues = new HashSet<>(); @@ -570,8 +570,8 @@ private boolean addLeafQueueIfNotExists(Set leafQueues, @VisibleForTesting public boolean isActive(final AutoCreatedLeafQueue leafQueue, String nodeLabel) throws SchedulerDynamicEditException { + readLock.lock(); try { - readLock.lock(); LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue, nodeLabel); return leafQueueStatus.isActive(); @@ -649,8 +649,8 @@ public int getMaxLeavesToBeActivated(float availableCapacity, public void commitQueueManagementChanges( List queueManagementChanges) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); for (QueueManagementChange queueManagementChange : queueManagementChanges) { AutoCreatedLeafQueueConfig updatedQueueTemplate = @@ -695,8 +695,8 @@ public void commitQueueManagementChanges( private void activate(final AbstractAutoCreatedLeafQueue leafQueue, String nodeLabel) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); getLeafQueueState(leafQueue, nodeLabel).activate(); parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel, leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel)); @@ -707,8 +707,8 @@ private void activate(final AbstractAutoCreatedLeafQueue leafQueue, private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue, String nodeLabel) throws SchedulerDynamicEditException { + writeLock.lock(); try { - writeLock.lock(); getLeafQueueState(leafQueue, nodeLabel).deactivate(); parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel, @@ -765,9 +765,8 @@ public AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration( .getClass()); } + writeLock.lock(); try { - writeLock.lock(); - QueueCapacities capacities = new QueueCapacities(false); for (String nodeLabel : leafQueueTemplateNodeLabels) { if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue, @@ -816,8 +815,8 @@ private void updateCapacityFromTemplate(QueueCapacities capacities, @VisibleForTesting LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue, String partition) throws SchedulerDynamicEditException { + readLock.lock(); try { - readLock.lock(); String queueName = queue.getQueueName(); if (!leafQueueState.containsLeafQueue(queueName, partition)) { throw new SchedulerDynamicEditException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index d0e8220..ff60860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -200,8 +200,8 @@ private void updateMultiNodeSortingPolicy(RMApp rmApp) { public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, String partition) { + writeLock.lock(); try { - writeLock.lock(); ContainerId containerId = rmContainer.getContainerId(); // Remove from the list of containers @@ -245,8 +245,8 @@ public boolean containerCompleted(RMContainer rmContainer, public RMContainer allocate(FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Container container) { + readLock.lock(); try { - readLock.lock(); if (isStopped) { return null; @@ -437,8 +437,8 @@ public boolean accept(Resource cluster, ContainerRequest containerRequest = null; boolean reReservation = false; + readLock.lock(); try { - readLock.lock(); // First make sure no container in release list in final state if (anyContainerInFinalState(request)) { @@ -560,8 +560,8 @@ public boolean apply(Resource cluster, ResourceCommitRequest request, boolean updatePending) { boolean reReservation = false; + writeLock.lock(); try { - writeLock.lock(); // If we allocated something if (request.anythingAllocatedOrReserved()) { @@ -692,8 +692,8 @@ public boolean apply(Resource cluster, ResourceCommitRequest getTotalPendingRequestsPerPartition() { + readLock.lock(); try { - readLock.lock(); Map ret = new HashMap<>(); for (SchedulerRequestKey schedulerKey : appSchedulingInfo @@ -780,8 +780,8 @@ private boolean internalUnreserve(FiCaSchedulerNode node, } public void markContainerForPreemption(ContainerId cont) { + writeLock.lock(); try { - writeLock.lock(); // ignore already completed containers if (liveContainers.containsKey(cont)) { containersToPreempt.add(cont); @@ -803,8 +803,8 @@ public void markContainerForPreemption(ContainerId cont) { */ public Allocation getAllocation(ResourceCalculator resourceCalculator, Resource clusterResource, Resource minimumAllocation) { + writeLock.lock(); try { - writeLock.lock(); Set currentContPreemption = Collections.unmodifiableSet( new HashSet(containersToPreempt)); containersToPreempt.clear(); @@ -872,8 +872,8 @@ public NodeId getNodeIdToUnreserve(SchedulerRequestKey schedulerKey, public void setHeadroomProvider( CapacityHeadroomProvider headroomProvider) { + writeLock.lock(); try { - writeLock.lock(); this.headroomProvider = headroomProvider; } finally { writeLock.unlock(); @@ -882,8 +882,8 @@ public void setHeadroomProvider( @Override public Resource getHeadroom() { + readLock.lock(); try { - readLock.lock(); if (headroomProvider != null) { return headroomProvider.getHeadroom(); } @@ -897,8 +897,8 @@ public Resource getHeadroom() { @Override public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { + writeLock.lock(); try { - writeLock.lock(); super.transferStateFromPreviousAttempt(appAttempt); this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider; } finally { @@ -925,8 +925,8 @@ public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, @VisibleForTesting public RMContainer findNodeToUnreserve(FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Resource minimumUnreservedResource) { + readLock.lock(); try { - readLock.lock(); // need to unreserve some other container first NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey, minimumUnreservedResource, rc); @@ -1107,11 +1107,11 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { */ @Override public ApplicationResourceUsageReport getResourceUsageReport() { + writeLock.lock(); try { // Use write lock here because // SchedulerApplicationAttempt#getResourceUsageReport updated fields // TODO: improve this - writeLock.lock(); ApplicationResourceUsageReport report = super.getResourceUsageReport(); Resource cluster = rmContext.getScheduler().getClusterResource(); Resource totalPartitionRes = @@ -1174,8 +1174,8 @@ public boolean equals(Object o) { */ public boolean moveReservation(RMContainer reservedContainer, FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) { + writeLock.lock(); try { - writeLock.lock(); if (!sourceNode.getPartition().equals(targetNode.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java index bf04672..9436138 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java @@ -83,8 +83,8 @@ public void registerApplication(ApplicationId appId, Map, PlacementConstraint> constraintMap) { // Check if app already exists. If not, prepare its constraint map. Map constraintsForApp = new HashMap<>(); + readLock.lock(); try { - readLock.lock(); if (appConstraints.get(appId) != null) { LOG.warn("Application {} has already been registered.", appId); return; @@ -109,8 +109,8 @@ public void registerApplication(ApplicationId appId, appId); } // Update appConstraints. + writeLock.lock(); try { - writeLock.lock(); appConstraints.put(appId, constraintsForApp); } finally { writeLock.unlock(); @@ -120,8 +120,8 @@ public void registerApplication(ApplicationId appId, @Override public void addConstraint(ApplicationId appId, Set sourceTags, PlacementConstraint placementConstraint, boolean replace) { + writeLock.lock(); try { - writeLock.lock(); Map constraintsForApp = appConstraints.get(appId); if (constraintsForApp == null) { @@ -140,8 +140,8 @@ public void addConstraint(ApplicationId appId, Set sourceTags, @Override public void addGlobalConstraint(Set sourceTags, PlacementConstraint placementConstraint, boolean replace) { + writeLock.lock(); try { - writeLock.lock(); addConstraintToMap(globalConstraints, sourceTags, placementConstraint, replace); } finally { @@ -181,8 +181,8 @@ private void addConstraintToMap( @Override public Map, PlacementConstraint> getConstraints( ApplicationId appId) { + readLock.lock(); try { - readLock.lock(); if (appConstraints.get(appId) == null) { if (LOG.isDebugEnabled()) { LOG.debug("Application {} is not registered in the Placement " @@ -212,8 +212,8 @@ public PlacementConstraint getConstraint(ApplicationId appId, return null; } String sourceTag = getValidSourceTag(sourceTags); + readLock.lock(); try { - readLock.lock(); if (appConstraints.get(appId) == null) { if (LOG.isDebugEnabled()) { LOG.debug("Application {} is not registered in the Placement " @@ -235,8 +235,8 @@ public PlacementConstraint getGlobalConstraint(Set sourceTags) { return null; } String sourceTag = getValidSourceTag(sourceTags); + readLock.lock(); try { - readLock.lock(); return globalConstraints.get(sourceTag); } finally { readLock.unlock(); @@ -284,8 +284,8 @@ public PlacementConstraint getMultilevelConstraint(ApplicationId appId, @Override public void unregisterApplication(ApplicationId appId) { + writeLock.lock(); try { - writeLock.lock(); appConstraints.remove(appId); } finally { writeLock.unlock(); @@ -298,8 +298,8 @@ public void removeGlobalConstraint(Set sourceTags) { return; } String sourceTag = getValidSourceTag(sourceTags); + writeLock.lock(); try { - writeLock.lock(); globalConstraints.remove(sourceTag); } finally { writeLock.unlock(); @@ -308,8 +308,8 @@ public void removeGlobalConstraint(Set sourceTags) { @Override public int getNumRegisteredApplications() { + readLock.lock(); try { - readLock.lock(); return appConstraints.size(); } finally { readLock.unlock(); @@ -318,8 +318,8 @@ public int getNumRegisteredApplications() { @Override public int getNumGlobalConstraints() { + readLock.lock(); try { - readLock.lock(); return globalConstraints.size(); } finally { readLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 4f5b425..5fe7797 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -131,8 +131,8 @@ public QueueMetrics getMetrics() { void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + writeLock.lock(); try { - writeLock.lock(); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -181,8 +181,8 @@ void containerCompleted(RMContainer rmContainer, private void unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { + writeLock.lock(); try { - writeLock.lock(); Map reservedContainers = this.reservedContainers.get( schedulerKey); RMContainer reservedContainer = reservedContainers.remove( @@ -284,8 +284,8 @@ NodeType getAllowedLocalityLevel( return NodeType.OFF_SWITCH; } + writeLock.lock(); try { - writeLock.lock(); // Default level is NODE_LOCAL if (!allowedLocalityLevel.containsKey(schedulerKey)) { @@ -354,8 +354,8 @@ NodeType getAllowedLocalityLevelByTime( return NodeType.OFF_SWITCH; } + writeLock.lock(); try { - writeLock.lock(); // default level is NODE_LOCAL if (!allowedLocalityLevel.containsKey(schedulerKey)) { @@ -425,8 +425,8 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, RMContainer rmContainer; Container container; + writeLock.lock(); try { - writeLock.lock(); // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(schedulerKey); if (allowed != null) { @@ -498,8 +498,8 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, void resetAllowedLocalityLevel( SchedulerRequestKey schedulerKey, NodeType level) { NodeType old; + writeLock.lock(); try { - writeLock.lock(); old = allowedLocalityLevel.put(schedulerKey, level); } finally { writeLock.unlock(); @@ -664,9 +664,8 @@ private Container createContainer(FSSchedulerNode node, Resource capability, @Override public synchronized void recoverContainer(SchedulerNode node, RMContainer rmContainer) { + writeLock.lock(); try { - writeLock.lock(); - super.recoverContainer(node, rmContainer); if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) { @@ -776,8 +775,8 @@ private void setReservation(SchedulerNode node) { String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + writeLock.lock(); try { - writeLock.lock(); Set rackReservations = reservations.get(rackName); if (rackReservations == null) { rackReservations = new HashSet<>(); @@ -793,8 +792,8 @@ private void clearReservation(SchedulerNode node) { String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); + writeLock.lock(); try { - writeLock.lock(); Set rackReservations = reservations.get(rackName); if (rackReservations != null) { rackReservations.remove(node.getNodeName()); @@ -963,8 +962,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // For each priority, see if we can schedule a node local, rack local // or off-switch request. Rack of off-switch requests may be delayed // (not scheduled) in order to promote better locality. + writeLock.lock(); try { - writeLock.lock(); // TODO (wandga): All logics in this method should be added to // SchedulerPlacement#canDelayTo which is independent from scheduler. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index 074d5da..8947938 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -53,9 +53,9 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Container container) { - try { - writeLock.lock(); + writeLock.lock(); + try { if (isStopped) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 4557350..464cbfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -155,9 +155,9 @@ private void updateNodeLabels(ResourceRequest request) { public PendingAskUpdateResult updatePendingAsk( Collection requests, boolean recoverPreemptedRequestForAContainer) { - try { - this.writeLock.lock(); + this.writeLock.lock(); + try { PendingAskUpdateResult updateResult = null; // Update resource requests @@ -228,8 +228,8 @@ private ResourceRequest getResourceRequest(String resourceName) { @Override public PendingAsk getPendingAsk(String resourceName) { + readLock.lock(); try { - readLock.lock(); ResourceRequest request = getResourceRequest(resourceName); if (null == request) { return PendingAsk.ZERO; @@ -245,8 +245,8 @@ public PendingAsk getPendingAsk(String resourceName) { @Override public int getOutstandingAsksCount(String resourceName) { + readLock.lock(); try { - readLock.lock(); ResourceRequest request = getResourceRequest(resourceName); if (null == request) { return 0; @@ -353,8 +353,8 @@ private void decResourceRequest(String resourceName, @Override public boolean canAllocate(NodeType type, SchedulerNode node) { + readLock.lock(); try { - readLock.lock(); ResourceRequest r = resourceRequestMap.get( ResourceRequest.ANY); if (r == null || r.getNumContainers() <= 0) { @@ -381,8 +381,8 @@ public boolean canAllocate(NodeType type, SchedulerNode node) { @Override public boolean canDelayTo(String resourceName) { + readLock.lock(); try { - readLock.lock(); ResourceRequest request = getResourceRequest(resourceName); return request == null || request.getRelaxLocality(); } finally { @@ -432,8 +432,8 @@ public void showRequests() { @Override public ContainerRequest allocate(SchedulerRequestKey schedulerKey, NodeType type, SchedulerNode node) { + writeLock.lock(); try { - writeLock.lock(); List resourceRequests = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 54e4666..a6de628 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -363,8 +363,8 @@ private boolean checkCardinalityAndPending(SchedulerNode node) { @Override public boolean canAllocate(NodeType type, SchedulerNode node) { + readLock.lock(); try { - readLock.lock(); return checkCardinalityAndPending(node); } finally { readLock.unlock(); @@ -411,8 +411,8 @@ public int getUniqueLocationAsks() { @Override public void showRequests() { + readLock.lock(); try { - readLock.lock(); if (schedulingRequest != null) { LOG.info(schedulingRequest.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index 956391e..b1cec85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -191,8 +191,8 @@ public void run() { public NMToken createAndGetNMToken(String applicationSubmitter, ApplicationAttemptId appAttemptId, Container container) { + this.writeLock.lock(); try { - this.writeLock.lock(); HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); NMToken nmToken = null; if (nodeSet != null) { @@ -213,8 +213,8 @@ public NMToken createAndGetNMToken(String applicationSubmitter, } public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet()); } finally { this.writeLock.unlock(); @@ -225,8 +225,8 @@ public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { @VisibleForTesting public boolean isApplicationAttemptRegistered( ApplicationAttemptId appAttemptId) { + this.readLock.lock(); try { - this.readLock.lock(); return this.appAttemptToNodeKeyMap.containsKey(appAttemptId); } finally { this.readLock.unlock(); @@ -237,8 +237,8 @@ public boolean isApplicationAttemptRegistered( @VisibleForTesting public boolean isApplicationAttemptNMTokenPresent( ApplicationAttemptId appAttemptId, NodeId nodeId) { + this.readLock.lock(); try { - this.readLock.lock(); HashSet nodes = this.appAttemptToNodeKeyMap.get(appAttemptId); if (nodes != null && nodes.contains(nodeId)) { return true; @@ -251,8 +251,8 @@ public boolean isApplicationAttemptNMTokenPresent( } public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); try { - this.writeLock.lock(); this.appAttemptToNodeKeyMap.remove(appAttemptId); } finally { this.writeLock.unlock(); @@ -265,8 +265,8 @@ public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) { * @param nodeId */ public void removeNodeKey(NodeId nodeId) { + this.writeLock.lock(); try { - this.writeLock.lock(); Iterator> appNodeKeySetIterator = this.appAttemptToNodeKeyMap.values().iterator(); while (appNodeKeySetIterator.hasNext()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java index 068e8a4..01cdf94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java @@ -123,8 +123,8 @@ public VolumeMetaData getVolumeMeta() { @Override public VolumeState getVolumeState() { + readLock.lock(); try { - readLock.lock(); return stateMachine.getCurrentState(); } finally { readLock.unlock(); @@ -133,8 +133,8 @@ public VolumeState getVolumeState() { @Override public VolumeId getVolumeId() { + readLock.lock(); try { - readLock.lock(); return this.volumeId; } finally { readLock.unlock(); @@ -183,8 +183,8 @@ public VolumeState transition(VolumeImpl volume, @Override public void handle(VolumeEvent event) { + this.writeLock.lock(); try { - this.writeLock.lock(); VolumeId volumeId = event.getVolumeId(); if (volumeId == null) { -- 2.7.4 (Apple Git-66)