diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index bccde53..347c821 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -91,7 +91,8 @@ private ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; - + private final Object dispatcherLock = new Object(); + public static final Log LOG = LogFactory.getLog(RMStateStore.class); private enum RMStateStoreState { @@ -591,20 +592,24 @@ public void checkVersion() throws Exception { * RMAppStoredEvent will be sent on completion to notify the RMApp */ @SuppressWarnings("unchecked") - public synchronized void storeNewApplication(RMApp app) { + public void storeNewApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; ApplicationStateData appState = ApplicationStateData.newInstance( app.getSubmitTime(), app.getStartTime(), context, app.getUser()); - dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); + synchronized(dispatcherLock) { + dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); + } } @SuppressWarnings("unchecked") - public synchronized void updateApplicationState( + public void updateApplicationState( ApplicationStateData appState) { - dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); + synchronized(dispatcherLock) { + dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); + } } public void updateFencedState() { @@ -629,7 +634,7 @@ protected abstract void updateApplicationStateInternal(ApplicationId appId, * This does not block the dispatcher threads * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt */ - public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { + public void storeNewApplicationAttempt(RMAppAttempt appAttempt) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); AggregateAppResourceUsage resUsage = @@ -641,16 +646,19 @@ public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { credentials, appAttempt.getStartTime(), resUsage.getMemorySeconds(), resUsage.getVcoreSeconds()); - - dispatcher.getEventHandler().handle( - new RMStateStoreAppAttemptEvent(attemptState)); + synchronized(dispatcherLock) { + dispatcher.getEventHandler().handle( + new RMStateStoreAppAttemptEvent(attemptState)); + } } @SuppressWarnings("unchecked") - public synchronized void updateApplicationAttemptState( + public void updateApplicationAttemptState( ApplicationAttemptStateData attemptState) { - dispatcher.getEventHandler().handle( - new RMStateUpdateAppAttemptEvent(attemptState)); + synchronized(dispatcherLock) { + dispatcher.getEventHandler().handle( + new RMStateUpdateAppAttemptEvent(attemptState)); + } } /** @@ -778,7 +786,7 @@ public void storeOrUpdateAMRMTokenSecretManager( * There is no notification of completion for this operation. */ @SuppressWarnings("unchecked") - public synchronized void removeApplication(RMApp app) { + public void removeApplication(RMApp app) { ApplicationStateData appState = ApplicationStateData.newInstance( app.getSubmitTime(), app.getStartTime(), @@ -786,8 +794,9 @@ public synchronized void removeApplication(RMApp app) { for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { appState.attempts.put(appAttempt.getAppAttemptId(), null); } - - dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); + synchronized(dispatcherLock) { + dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); + } } /**