diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 1c7f987..663e55a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -298,32 +298,35 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // For work-preserving AM restart, retrieve previous attempts' containers // and corresponding NM tokens. - List transferredContainers = - ((AbstractYarnScheduler) rScheduler) + if (app.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + List transferredContainers = ((AbstractYarnScheduler) rScheduler) .getTransferredContainers(applicationAttemptId); - if (!transferredContainers.isEmpty()) { - response.setContainersFromPreviousAttempts(transferredContainers); - List nmTokens = new ArrayList(); - for (Container container : transferredContainers) { - try { - NMToken token = rmContext.getNMTokenSecretManager() - .createAndGetNMToken(app.getUser(), applicationAttemptId, - container); - if (null != token) { - nmTokens.add(token); - } - } catch (IllegalArgumentException e) { - // if it's a DNS issue, throw UnknowHostException directly and that - // will be automatically retried by RMProxy in RPC layer. - if (e.getCause() instanceof UnknownHostException) { - throw (UnknownHostException) e.getCause(); + if (!transferredContainers.isEmpty()) { + response.setContainersFromPreviousAttempts(transferredContainers); + List nmTokens = new ArrayList(); + for (Container container : transferredContainers) { + try { + NMToken token = rmContext.getNMTokenSecretManager() + .createAndGetNMToken(app.getUser(), applicationAttemptId, + container); + if (null != token) { + nmTokens.add(token); + } + } catch (IllegalArgumentException e) { + // if it's a DNS issue, throw UnknowHostException directly and + // that + // will be automatically retried by RMProxy in RPC layer. + if (e.getCause() instanceof UnknownHostException) { + throw (UnknownHostException) e.getCause(); + } } } + response.setNMTokensFromPreviousAttempts(nmTokens); + LOG.info("Application " + appID + " retrieved " + + transferredContainers.size() + " containers from previous" + + " attempts and " + nmTokens.size() + " NM tokens."); } - response.setNMTokensFromPreviousAttempts(nmTokens); - LOG.info("Application " + appID + " retrieved " - + transferredContainers.size() + " containers from previous" - + " attempts and " + nmTokens.size() + " NM tokens."); } response.setSchedulerResourceTypes(rScheduler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 968a767..6c3754d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -67,6 +69,8 @@ @SuppressWarnings("unchecked") +@Private +@Unstable public abstract class AbstractYarnScheduler extends AbstractService implements ResourceScheduler { @@ -122,21 +126,40 @@ public void serviceInit(Configuration conf) throws Exception { createReleaseCache(); super.serviceInit(conf); } + + /** + * By default, 'applications' is not a concurrent map. Hence it has to be + * accessed via synchronized block. However if schedulers are overriding + * this map to a concurrent version, then this method also can be overridden + * to remove the additional lock. + * + * @param name ApplicationId + */ + public SchedulerApplication getSchedulerApplication(ApplicationId appId) { + SchedulerApplication schedulerApp = null; + synchronized (this) { + schedulerApp = applications.get(appId); + } + return schedulerApp; + } - public synchronized List getTransferredContainers( + public List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); - SchedulerApplication app = applications.get(appId); + SchedulerApplication app = getSchedulerApplication(appId); List containerList = new ArrayList(); RMApp appImpl = this.rmContext.getRMApps().get(appId); if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { return containerList; } - Collection liveContainers = - app.getCurrentAppAttempt().getLiveContainers(); - ContainerId amContainerId = - rmContext.getRMApps().get(appId).getCurrentAppAttempt() - .getMasterContainer().getId(); + if (app == null) { + return containerList; + } + + Collection liveContainers = app.getCurrentAppAttempt() + .getLiveContainers(); + ContainerId amContainerId = rmContext.getRMApps().get(appId) + .getCurrentAppAttempt().getMasterContainer().getId(); for (RMContainer rmContainer : liveContainers) { if (!rmContainer.getContainerId().equals(amContainerId)) { containerList.add(rmContainer.getContainer()); @@ -216,7 +239,7 @@ protected synchronized void containerLaunchedOnNode( public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { SchedulerApplication app = - applications.get(applicationAttemptId.getApplicationId()); + getSchedulerApplication(applicationAttemptId.getApplicationId()); return app == null ? null : app.getCurrentAppAttempt(); } @@ -324,7 +347,7 @@ public synchronized void recoverContainersOnNode( continue; } - SchedulerApplication schedulerApp = applications.get(appId); + SchedulerApplication schedulerApp = getSchedulerApplication(appId); if (schedulerApp == null) { LOG.info("Skip recovering container " + container + " for unknown SchedulerApplication. Application current state is " @@ -406,7 +429,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, container.getId().getApplicationAttemptId(); RMContainer rmContainer = new RMContainerImpl(container, attemptId, node.getNodeID(), - applications.get(attemptId.getApplicationId()).getUser(), rmContext, + getSchedulerApplication(attemptId.getApplicationId()).getUser(), rmContext, status.getCreationTime()); return rmContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 28ce264..c3b834b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1636,4 +1636,14 @@ private String handleMoveToPlanQueue(String targetQueueName) { } return ret; } + + /** + * 'applications' is a concurrent map. Hence default implementation can be + * overridden to remove synchronized access. + */ + @Override + public SchedulerApplication getSchedulerApplication( + ApplicationId appId) { + return applications.get(appId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e8a9555..f6ea29b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -116,6 +116,7 @@ @SuppressWarnings("unchecked") public class FairScheduler extends AbstractYarnScheduler { + private FairSchedulerConfiguration conf; private Resource incrAllocation; @@ -1682,4 +1683,14 @@ private String handleMoveToPlanQueue(String targetQueueName) { } return targetQueueName; } + + /** + * 'applications' is a concurrent map. Hence default implementation can be + * overridden to remove synchronized access. + */ + @Override + public SchedulerApplication getSchedulerApplication( + ApplicationId appId) { + return applications.get(appId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index e006715..3ede1ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -972,4 +972,14 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, public Resource getUsedResource() { return usedResource; } + + /** + * 'applications' is a concurrent map. Hence default implementation can be + * overridden to remove synchronized access. + */ + @Override + public SchedulerApplication getSchedulerApplication( + ApplicationId appId) { + return applications.get(appId); + } } -- 1.9.4.msysgit.1