diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index b24e20e..9a3cb24 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -860,4 +860,9 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( public List getAppsInQueue(String queue) { return scheduler.getAppsInQueue(queue); } + + @Override + public RMContainer getRMContainer(ContainerId containerId) { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index eaea13e..ca5c661 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -57,7 +57,8 @@ public static ApplicationSubmissionContext newInstance( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, - int maxAppAttempts, Resource resource, String applicationType) { + int maxAppAttempts, Resource resource, String applicationType, + boolean cleanContainers) { ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); context.setApplicationId(applicationId); @@ -70,6 +71,7 @@ public static ApplicationSubmissionContext newInstance( context.setMaxAppAttempts(maxAppAttempts); context.setResource(resource); context.setApplicationType(applicationType); + context.setCleanContainersWhenFail(cleanContainers); return context; } @@ -82,7 +84,7 @@ public static ApplicationSubmissionContext newInstance( int maxAppAttempts, Resource resource) { return newInstance(applicationId, applicationName, queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, - resource, null); + resource, null, true); } /** @@ -268,4 +270,9 @@ public static ApplicationSubmissionContext newInstance( @Public @Stable public abstract void setApplicationType(String applicationType); + + public abstract void setCleanContainersWhenFail(boolean cleanContainers); + + public abstract boolean getCleanContainersWhenFail(); + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 7dc62fc..d00ee0d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -248,6 +248,7 @@ message ApplicationSubmissionContextProto { optional int32 maxAppAttempts = 8 [default = 0]; optional ResourceProto resource = 9; optional string applicationType = 10 [default = "YARN"]; + optional bool clean_containers_when_fail = 11 [default = true]; } enum ApplicationAccessTypeProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 966995c..45433ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -379,6 +379,8 @@ public void testAMMRTokens() throws Exception { appId = createApp(rmClient, true); waitTillAccepted(rmClient, appId); //unmanaged AMs do return AMRM token + // wait more time for unmanaged AM attempt to be created. + Thread.sleep(3000); Assert.assertNotNull(rmClient.getAMRMToken(appId)); UserGroupInformation other = @@ -393,6 +395,8 @@ public ApplicationId run() throws Exception { ApplicationId appId = createApp(rmClient, true); waitTillAccepted(rmClient, appId); //unmanaged AMs do return AMRM token + // wait more time for unmanaged AM attempt to be created. + Thread.sleep(3000); Assert.assertNotNull(rmClient.getAMRMToken(appId)); return appId; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 5b48141..7afbb64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -298,6 +298,18 @@ public void setResource(Resource resource) { this.resource = resource; } + @Override + public void setCleanContainersWhenFail(boolean cleanContainers) { + maybeInitBuilder(); + builder.setCleanContainersWhenFail(cleanContainers); + } + + @Override + public boolean getCleanContainersWhenFail() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getCleanContainersWhenFail(); + } + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 29f0ebe..2fd1951 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -421,7 +421,9 @@ public AllocateResponse allocate(AllocateRequest request) LOG.warn("Invalid blacklist request by application " + appAttemptId, e); throw e; } - + + // TODO In the case of work-preserving AM restart, it's possible for the + // AM to release containers form the earlier attempt. try { RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); } catch (InvalidContainerReleaseException e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 1d451fb..827623b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -657,7 +657,7 @@ private void createNewAttempt(boolean startAttempt) { ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf); + submissionContext, conf, maxAppAttempts == attempts.size()); attempts.put(appAttemptId, attempt); currentAttempt = attempt; if(startAttempt) { @@ -1019,7 +1019,15 @@ public AttemptFailedTransition(RMAppState initialState) { public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (!app.submissionContext.getUnmanagedAM() && app.attempts.size() < app.maxAppAttempts) { + RMAppAttempt oldAttempt = app.currentAttempt; app.createNewAttempt(true); + // Copy the info from the previous attempt to the current attempt. Note + // that the previous failed attempt may still be collecting the + // container events from the scheduler and update its data structures + // before the new attempt is created. + if (!((RMAppAttemptImpl) oldAttempt).getShouldCleanContainers()) { + ((RMAppAttemptImpl) app.currentAttempt).recoverAttempt(oldAttempt); + } return initialState; } else { app.rememberTargetTransitionsAndStoreState(event, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index f805f42..c66b2e4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -129,9 +129,9 @@ private SecretKey clientTokenMasterKey = null; //nodes on while this attempt's containers ran - private final Set ranNodes = + private Set ranNodes = new HashSet(); - private final List justFinishedContainers = + private List justFinishedContainers = new ArrayList(); private Container masterContainer; @@ -148,7 +148,8 @@ private final StringBuilder diagnostics = new StringBuilder(); private Configuration conf; - + private final boolean isLastAttempt; + boolean shouldCleanContainers = true; private static final ExpiredTransition EXPIRED_TRANSITION = new ExpiredTransition(); @@ -330,6 +331,14 @@ RMAppAttemptEventType.KILL)) // Transitions from FAILED State + // For work-preserving AM restart, failed attempt are still capturing + // container events for the use by the next new attempt. + .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED, + RMAppAttemptEventType.CONTAINER_ACQUIRED, + new ContainerAcquiredTransition()) + .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED, + RMAppAttemptEventType.CONTAINER_FINISHED, + new ContainerFinishedAtFailedTransition()) .addTransition( RMAppAttemptState.FAILED, RMAppAttemptState.FAILED, @@ -338,8 +347,7 @@ RMAppAttemptEventType.KILL, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.STATUS_UPDATE, - RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_FINISHED)) + RMAppAttemptEventType.CONTAINER_ALLOCATED)) // Transitions from FINISHING State .addTransition(RMAppAttemptState.FINISHING, @@ -390,7 +398,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf) { + Configuration conf, boolean isLastAttempt) { this.conf = conf; this.applicationAttemptId = appAttemptId; this.rmContext = rmContext; @@ -404,7 +412,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.writeLock = lock.writeLock(); this.proxiedTrackingUrl = generateProxyUriWithScheme(null); - + this.isLastAttempt = isLastAttempt; this.stateMachine = stateMachineFactory.make(this); } @@ -418,6 +426,10 @@ public ApplicationSubmissionContext getSubmissionContext() { return this.submissionContext; } + public boolean getShouldCleanContainers() { + return this.shouldCleanContainers; + } + @Override public FinalApplicationStatus getFinalApplicationStatus() { this.readLock.lock(); @@ -685,6 +697,11 @@ public void recover(RMState state) throws Exception { this.startTime = attemptState.getStartTime(); } + public void recoverAttempt(RMAppAttempt attempt) { + this.justFinishedContainers = attempt.getJustFinishedContainers(); + this.ranNodes = attempt.getRanNodes(); + } + private void recoverAppAttemptCredentials(Credentials appAttemptTokens) throws IOException { if (appAttemptTokens == null) { @@ -1008,6 +1025,9 @@ public void transition(RMAppAttemptImpl appAttempt, new RMAppFailedAttemptEvent(applicationId, RMAppEventType.ATTEMPT_FAILED, appAttempt.getDiagnostics()); + if (!appAttempt.submissionContext.getCleanContainersWhenFail()) { + appAttempt.shouldCleanContainers = false; + } } break; default: @@ -1018,8 +1038,12 @@ public void transition(RMAppAttemptImpl appAttempt, } appAttempt.eventHandler.handle(appEvent); + if (appAttempt.isLastAttempt + || appAttempt.submissionContext.getUnmanagedAM()) { + appAttempt.shouldCleanContainers = true; + } appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( - appAttemptId, finalAttemptState)); + appAttemptId, finalAttemptState, appAttempt.shouldCleanContainers)); appAttempt.removeCredentials(appAttempt); } } @@ -1045,6 +1069,11 @@ public void transition(RMAppAttemptImpl appAttempt, public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { appAttempt.checkAttemptStoreError(event); + // TODO Today unmanaged AM client is waiting for app state to be Accepted to + // launch the AM. This is broken since we changed to start the attempt + // after the application is Accepted. We may need to introduce an attempt + // report that client can rely on to query the attempt state and choose to + // launch the unmanaged AM. super.transition(appAttempt, event); } } @@ -1346,6 +1375,20 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } + private static final class ContainerFinishedAtFailedTransition + extends BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + RMAppAttemptContainerFinishedEvent containerFinishedEvent = + (RMAppAttemptContainerFinishedEvent) event; + ContainerStatus containerStatus = + containerFinishedEvent.getContainerStatus(); + // Normal container. Add it in completed containers list + appAttempt.justFinishedContainers.add(containerStatus); + } + } + private static class ContainerFinishedFinalStateSavedTransition extends BaseTransition { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index b5b22b6..65183c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -59,10 +59,10 @@ final Set priorities = new TreeSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); - final Map> requests = + final Map> requests = new HashMap>(); - final Set blacklist = new HashSet(); - + private Set blacklist = new HashSet(); + //private final ApplicationStore store; private final ActiveUsersManager activeUsersManager; @@ -399,4 +399,14 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { public synchronized void setQueue(Queue queue) { this.queue = queue; } + + public synchronized Set getBlackList() { + return this.blacklist; + } + + public synchronized void recover(AppSchedulingInfo appInfo) { +// this.priorities = appInfo.getPriorities(); +// this.requests = appInfo.getRequests(); + this.blacklist = appInfo.getBlackList(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 48e3ee8..756245c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -26,6 +26,8 @@ private final Queue queue; private final String user; + private SchedulerApplicationAttempt currentAttempt; + private boolean shouldRecover = false; public SchedulerApplication(Queue queue, String user) { this.queue = queue; @@ -39,4 +41,20 @@ public Queue getQueue() { public String getUser() { return user; } + + public SchedulerApplicationAttempt getCurrentAppAttempt() { + return currentAttempt; + } + + public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) { + this.currentAttempt = currentAttempt; + } + + public boolean getShouldRecover() { + return shouldRecover; + } + + public void setShouldRecover(boolean shouldRecover) { + this.shouldRecover = shouldRecover; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c601cee..f8fd552 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -64,7 +64,7 @@ protected final AppSchedulingInfo appSchedulingInfo; - protected final Map liveContainers = + protected Map liveContainers = new HashMap(); protected final Map> reservedContainers = new HashMap>(); @@ -73,7 +73,7 @@ protected final Resource currentReservation = Resource.newInstance(0, 0); private Resource resourceLimit = Resource.newInstance(0, 0); - protected final Resource currentConsumption = Resource.newInstance(0, 0); + protected Resource currentConsumption = Resource.newInstance(0, 0); protected List newlyAllocatedContainers = new ArrayList(); @@ -407,4 +407,27 @@ public synchronized ApplicationResourceUsageReport getResourceUsageReport() { Resources.add(currentConsumption, currentReservation)); } + public synchronized Map getLiveContainersMap() { + return this.liveContainers; + } + + public synchronized Resource getResourceLimit() { + return this.resourceLimit; + } + + public synchronized Map getLastScheduledContainer() { + return this.lastScheduledContainer; + } + + public synchronized void recover(SchedulerApplicationAttempt appAttempt) { + this.liveContainers = appAttempt.getLiveContainersMap(); +// this.reReservations = appAttempt.reReservations; + this.currentConsumption = appAttempt.getCurrentConsumption(); + this.resourceLimit = appAttempt.getResourceLimit(); +// this.currentReservation = appAttempt.currentReservation; +// this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers; +// this.schedulingOpportunities = appAttempt.schedulingOpportunities; + this.lastScheduledContainer = appAttempt.getLastScheduledContainer(); + this.appSchedulingInfo.recover(appAttempt.appSchedulingInfo); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b0a56a4..4f1cb74 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -19,13 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; -import java.util.Collection; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; /** @@ -170,4 +171,13 @@ boolean checkAccess(UserGroupInformation callerUGI, @LimitedPrivate("yarn") @Stable public List getAppsInQueue(String queueName); + + /** + * Get the container for the given containerId. + * @param containerId + * @return the container for the given containerId. + */ + @LimitedPrivate("yarn") + @Unstable + public RMContainer getRMContainer(ContainerId containerId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5f34108..3a211c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -194,10 +196,6 @@ public Configuration getConf() { protected Map applications = new ConcurrentHashMap(); - @VisibleForTesting - protected Map appAttempts = - new ConcurrentHashMap(); - private boolean initialized = false; private ResourceCalculator calculator; @@ -469,16 +467,21 @@ private synchronized void addApplicationAttempt( applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); - FiCaSchedulerApp SchedulerApp = + FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext); - appAttempts.put(applicationAttemptId, SchedulerApp); - queue.submitApplicationAttempt(SchedulerApp, application.getUser()); + if (application.getShouldRecover()) { + attempt.recover(application.getCurrentAppAttempt()); + application.setShouldRecover(false); + } + application.setCurrentAppAttempt(attempt); + + queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, + rmContext.getDispatcher().getEventHandler() .handle( + new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); } @@ -486,7 +489,8 @@ private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { SchedulerApplication application = applications.get(applicationId); if (application == null){ - // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps. + // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, + // ignore it. return; } CSQueue queue = (CSQueue) application.getQueue(); @@ -501,52 +505,59 @@ private synchronized void doneApplication(ApplicationId applicationId, private synchronized void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, - RMAppAttemptState rmAppAttemptFinalState) { + RMAppAttemptState rmAppAttemptFinalState, boolean cleanContainers) { LOG.info("Application Attempt " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - FiCaSchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp attempt = getApplication(applicationAttemptId); + SchedulerApplication application = + applications.get(applicationAttemptId.getApplicationId()); - if (application == null) { - // throw new IOException("Unknown application " + applicationId + - // " has completed!"); + if (application == null || attempt == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); return; } - - // Release all the running containers - for (RMContainer rmContainer : application.getLiveContainers()) { - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + + // Release all the allocated, acquired, running containers + for (RMContainer rmContainer : attempt.getLiveContainers()) { + if (!cleanContainers + && rmContainer.getState().equals(RMContainerState.RUNNING)) { + // do not kill the running container in the case of work-preserving AM + // restart. + continue; + } + completedContainer( + rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.KILL); } - - // Release all reserved containers - for (RMContainer rmContainer : application.getReservedContainers()) { - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), - "Application Complete"), - RMContainerEventType.KILL); + + // Release all reserved containers + for (RMContainer rmContainer : attempt.getReservedContainers()) { + completedContainer( + rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + rmContainer.getContainerId(), "Application Complete"), + RMContainerEventType.KILL); } - + // Clean up pending requests, metrics etc. - application.stop(rmAppAttemptFinalState); - + attempt.stop(rmAppAttemptFinalState); + + // Inform application whether to recover the next attempt with the running + // containers of this attempt. + application.setShouldRecover(!cleanContainers); + // Inform the queue - String queueName = application.getQueue().getQueueName(); + String queueName = attempt.getQueue().getQueueName(); CSQueue queue = queues.get(queueName); if (!(queue instanceof LeafQueue)) { LOG.error("Cannot finish application " + "from non-leaf queue: " + queueName); } else { - queue.finishApplicationAttempt(application, queue.getQueueName()); + queue.finishApplicationAttempt(attempt, queue.getQueueName()); } - - // Remove from our data-structure - appAttempts.remove(applicationAttemptId); } private static final Allocation EMPTY_ALLOCATION = @@ -700,8 +711,9 @@ private synchronized void nodeUpdate(RMNode nm) { RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - FiCaSchedulerApp reservedApplication = - getApplication(reservedContainer.getApplicationAttemptId()); + FiCaSchedulerApp reservedApplication = + (FiCaSchedulerApp) getCurrentAttemptForContainer(reservedContainer + .getContainerId()); // Try to fulfill the reservation LOG.info("Trying to fulfill reservation for application " + @@ -738,12 +750,12 @@ private synchronized void nodeUpdate(RMNode nm) { private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - FiCaSchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = + (FiCaSchedulerApp) getCurrentAttemptForContainer(containerId); if (application == null) { - LOG.info("Unknown application: " + applicationAttemptId + - " launched container " + containerId + - " on node: " + node); + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); return; @@ -799,7 +811,8 @@ public void handle(SchedulerEvent event) { AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = (AppAttemptRemovedSchedulerEvent) event; doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(), - appAttemptRemovedEvent.getFinalAttemptState()); + appAttemptRemovedEvent.getFinalAttemptState(), + appAttemptRemovedEvent.getShouldCleanContainers()); } break; case CONTAINER_EXPIRED: @@ -874,13 +887,13 @@ private synchronized void completedContainer(RMContainer rmContainer, Container container = rmContainer.getContainer(); // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = - container.getId().getApplicationAttemptId(); - FiCaSchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = + (FiCaSchedulerApp) getCurrentAttemptForContainer(container.getId()); + ApplicationId appId = + container.getId().getApplicationAttemptId().getApplicationId(); if (application == null) { - LOG.info("Container " + container + " of" + - " unknown application " + applicationAttemptId + - " completed with event " + event); + LOG.info("Container " + container + " of" + " unknown application " + + appId + " completed with event " + event); return; } @@ -892,15 +905,19 @@ private synchronized void completedContainer(RMContainer rmContainer, queue.completedContainer(clusterResource, application, node, rmContainer, containerStatus, event, null); - LOG.info("Application " + applicationAttemptId + - " released container " + container.getId() + - " on node: " + node + - " with event: " + event); + LOG.info("Application attempt " + application.getApplicationAttemptId() + + " released container " + container.getId() + " on node: " + node + + " with event: " + event); } @Lock(Lock.NoLock.class) FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) { - return appAttempts.get(applicationAttemptId); + SchedulerApplication app = + applications.get(applicationAttemptId.getApplicationId()); + if (app != null) { + return (FiCaSchedulerApp) app.getCurrentAppAttempt(); + } + return null; } @Override @@ -922,10 +939,23 @@ FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } - private RMContainer getRMContainer(ContainerId containerId) { - FiCaSchedulerApp application = - getApplication(containerId.getApplicationAttemptId()); - return (application == null) ? null : application.getRMContainer(containerId); + @Override + public RMContainer getRMContainer(ContainerId containerId) { + FiCaSchedulerApp attempt = + (FiCaSchedulerApp) getCurrentAttemptForContainer(containerId); + return (attempt == null) ? null : attempt.getRMContainer(containerId); + } + + @VisibleForTesting + public SchedulerApplicationAttempt getCurrentAttemptForContainer( + ContainerId containerId) { + SchedulerApplication app = + applications.get(containerId.getApplicationAttemptId() + .getApplicationId()); + if (app != null) { + return app.getCurrentAppAttempt(); + } + return null; } @Override @@ -958,7 +988,7 @@ public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + " container: " + cont.toString()); } - FiCaSchedulerApp app = appAttempts.get(aid); + FiCaSchedulerApp app = getApplication(aid); if (app != null) { app.addPreemptContainer(cont.getContainerId()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 9c5a606..eccea74 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -220,8 +220,10 @@ public synchronized void reserveResource( } // Cannot reserve more than one application on a given node! - if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals( - reservedContainer.getContainer().getId().getApplicationAttemptId())) { + if (!this.reservedContainer.getContainer().getId() + .getApplicationAttemptId().getApplicationId().equals( + reservedContainer.getContainer().getId() + .getApplicationAttemptId().getApplicationId())) { throw new IllegalStateException("Trying to reserve" + " container " + reservedContainer + " for application " + application.getApplicationAttemptId() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index d6fb36d..006f80c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -26,8 +26,8 @@ private final String queue; private final String user; - public AppAddedSchedulerEvent( - ApplicationId applicationId, String queue, String user) { + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; @@ -45,5 +45,4 @@ public String getQueue() { public String getUser() { return user; } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java index 876d164..a78f816 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java @@ -25,13 +25,15 @@ private final ApplicationAttemptId applicationAttemptId; private final RMAppAttemptState finalAttemptState; + private final boolean shouldCleanContainers; public AppAttemptRemovedSchedulerEvent( ApplicationAttemptId applicationAttemptId, - RMAppAttemptState finalAttemptState) { + RMAppAttemptState finalAttemptState, boolean cleanContainers) { super(SchedulerEventType.APP_ATTEMPT_REMOVED); this.applicationAttemptId = applicationAttemptId; this.finalAttemptState = finalAttemptState; + this.shouldCleanContainers = cleanContainers; } public ApplicationAttemptId getApplicationAttemptID() { @@ -41,4 +43,8 @@ public ApplicationAttemptId getApplicationAttemptID() { public RMAppAttemptState getFinalAttemptState() { return this.finalAttemptState; } + + public boolean getShouldCleanContainers() { + return this.shouldCleanContainers; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bdfbcab..10bc7f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -262,10 +263,22 @@ public QueueManager getQueueManager() { return queueMgr; } - private RMContainer getRMContainer(ContainerId containerId) { - FSSchedulerApp application = - appAttempts.get(containerId.getApplicationAttemptId()); - return (application == null) ? null : application.getRMContainer(containerId); + @Override + public RMContainer getRMContainer(ContainerId containerId) { + FSSchedulerApp attempt = + (FSSchedulerApp) getCurrentAttemptForContainer(containerId); + return (attempt == null) ? null : attempt.getRMContainer(containerId); + } + + private SchedulerApplicationAttempt getCurrentAttemptForContainer( + ContainerId containerId) { + SchedulerApplication app = + applications.get(containerId.getApplicationAttemptId() + .getApplicationId()); + if (app != null) { + return app.getCurrentAppAttempt(); + } + return null; } /** @@ -640,7 +653,8 @@ protected synchronized void addApplication(ApplicationId applicationId, applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queueName); + + ", in queue: " + queueName + ", currently num of applications: " + + applications.size()); rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } @@ -655,25 +669,28 @@ protected synchronized void addApplicationAttempt( String user = application.getUser(); FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - FSSchedulerApp schedulerApp = + FSSchedulerApp attempt = new FSSchedulerApp(applicationAttemptId, user, queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); + if (application.getShouldRecover()) { + attempt.recover(application.getCurrentAppAttempt()); + application.setShouldRecover(false); + } + application.setCurrentAppAttempt(attempt); boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); - queue.addApp(schedulerApp, runnable); + queue.addApp(attempt, runnable); if (runnable) { - maxRunningEnforcer.trackRunnableApp(schedulerApp); + maxRunningEnforcer.trackRunnableApp(attempt); } else { - maxRunningEnforcer.trackNonRunnableApp(schedulerApp); + maxRunningEnforcer.trackNonRunnableApp(attempt); } queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId()); - appAttempts.put(applicationAttemptId, schedulerApp); LOG.info("Added Application Attempt " + applicationAttemptId - + " to scheduler from user: " + user + ", currently active: " - + appAttempts.size()); + + " to scheduler from user: " + user); rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); @@ -709,19 +726,26 @@ private synchronized void removeApplication(ApplicationId applicationId, private synchronized void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId, - RMAppAttemptState rmAppAttemptFinalState) { + RMAppAttemptState rmAppAttemptFinalState, boolean cleanContainers) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); + SchedulerApplication application = + applications.get(applicationAttemptId.getApplicationId()); + FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); - FSSchedulerApp application = appAttempts.get(applicationAttemptId); - - if (application == null) { + if (attempt == null || application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); return; } // Release all the running containers - for (RMContainer rmContainer : application.getLiveContainers()) { + for (RMContainer rmContainer : attempt.getLiveContainers()) { + if (!cleanContainers + && rmContainer.getState().equals(RMContainerState.RUNNING)) { + // do not kill the running container in the case of work-preserving AM + // restart. + continue; + } completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), @@ -730,30 +754,30 @@ private synchronized void removeApplicationAttempt( } // Release all reserved containers - for (RMContainer rmContainer : application.getReservedContainers()) { + for (RMContainer rmContainer : attempt.getReservedContainers()) { completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), - RMContainerEventType.KILL); + RMContainerEventType.KILL); } - // Clean up pending requests, metrics etc. - application.stop(rmAppAttemptFinalState); + attempt.stop(rmAppAttemptFinalState); + + // Inform application whether to recover the next attempt with the running + // containers of this attempt. + application.setShouldRecover(!cleanContainers); // Inform the queue - FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue() + FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue() .getQueueName(), false); - boolean wasRunnable = queue.removeApp(application); + boolean wasRunnable = queue.removeApp(attempt); if (wasRunnable) { - maxRunningEnforcer.updateRunnabilityOnAppRemoval(application); + maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt); } else { - maxRunningEnforcer.untrackNonRunnableApp(application); + maxRunningEnforcer.untrackNonRunnableApp(attempt); } - - // Remove from our data-structure - appAttempts.remove(applicationAttemptId); } /** @@ -769,11 +793,13 @@ private synchronized void completedContainer(RMContainer rmContainer, Container container = rmContainer.getContainer(); // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - FSSchedulerApp application = appAttempts.get(applicationAttemptId); + FSSchedulerApp application = + (FSSchedulerApp) getCurrentAttemptForContainer(container.getId()); + ApplicationId appId = + container.getId().getApplicationAttemptId().getApplicationId(); if (application == null) { LOG.info("Container " + container + " of" + - " unknown application " + applicationAttemptId + + " unknown application attempt " + appId + " completed with event " + event); return; } @@ -790,10 +816,9 @@ private synchronized void completedContainer(RMContainer rmContainer, updateRootQueueMetrics(); } - LOG.info("Application " + applicationAttemptId + - " released container " + container.getId() + - " on node: " + node + - " with event: " + event); + LOG.info("Application attempt " + application.getApplicationAttemptId() + + " released container " + container.getId() + " on node: " + node + + " with event: " + event); } private synchronized void addNode(RMNode node) { @@ -844,7 +869,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release, List blacklistAdditions, List blacklistRemovals) { // Make sure this application exists - FSSchedulerApp application = appAttempts.get(appAttemptId); + FSSchedulerApp application = getSchedulerApp(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -914,12 +939,12 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, */ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - FSSchedulerApp application = appAttempts.get(applicationAttemptId); + FSSchedulerApp application = + (FSSchedulerApp) getCurrentAttemptForContainer(containerId); if (application == null) { - LOG.info("Unknown application: " + applicationAttemptId + - " launched container " + containerId + - " on node: " + node); + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); return; } @@ -1058,28 +1083,34 @@ public SchedulerNodeReport getNodeReport(NodeId nodeId) { } public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - return appAttempts.get(appAttemptId); + SchedulerApplication app = + applications.get(appAttemptId.getApplicationId()); + if (app != null) { + return (FSSchedulerApp) app.getCurrentAppAttempt(); + } + return null; } @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId appAttemptId) { - if (!appAttempts.containsKey(appAttemptId)) { + FSSchedulerApp attempt = getSchedulerApp(appAttemptId); + if (attempt == null) { LOG.error("Request for appInfo of unknown attempt" + appAttemptId); return null; } - return new SchedulerAppReport(appAttempts.get(appAttemptId)); + return new SchedulerAppReport(attempt); } @Override public ApplicationResourceUsageReport getAppResourceUsageReport( ApplicationAttemptId appAttemptId) { - FSSchedulerApp app = appAttempts.get(appAttemptId); - if (app == null) { + FSSchedulerApp attempt = getSchedulerApp(appAttemptId); + if (attempt == null) { LOG.error("Request for appInfo of unknown attempt" + appAttemptId); return null; } - return app.getResourceUsageReport(); + return attempt.getResourceUsageReport(); } /** @@ -1153,8 +1184,10 @@ public void handle(SchedulerEvent event) { } AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = (AppAttemptRemovedSchedulerEvent) event; - removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(), - appAttemptRemovedEvent.getFinalAttemptState()); + removeApplicationAttempt( + appAttemptRemovedEvent.getApplicationAttemptID(), + appAttemptRemovedEvent.getFinalAttemptState(), + appAttemptRemovedEvent.getShouldCleanContainers()); break; case CONTAINER_EXPIRED: if (!(event instanceof ContainerExpiredSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 9d42913..2c3fd38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -123,15 +125,11 @@ private Resource maximumAllocation; private boolean usePortForNodeName; + // Use ConcurrentSkipListMap because applications need to be ordered @VisibleForTesting protected Map applications = new ConcurrentSkipListMap(); - // Use ConcurrentSkipListMap because applications need to be ordered - @VisibleForTesting - protected Map appAttempts - = new ConcurrentSkipListMap(); - private ActiveUsersManager activeUsersManager; private static final String DEFAULT_QUEUE_NAME = "default"; @@ -336,9 +334,13 @@ public Allocation allocate( } @VisibleForTesting - FiCaSchedulerApp getApplication( - ApplicationAttemptId applicationAttemptId) { - return appAttempts.get(applicationAttemptId); + FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) { + SchedulerApplication app = + applications.get(applicationAttemptId.getApplicationId()); + if (app != null) { + return (FiCaSchedulerApp) app.getCurrentAppAttempt(); + } + return null; } @Override @@ -364,7 +366,8 @@ private synchronized void addApplication(ApplicationId applicationId, SchedulerApplication application = new SchedulerApplication(null, user); applications.put(applicationId, application); - LOG.info("Accepted application " + applicationId + " from user: " + user); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", currently num of applications: " + applications.size()); rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } @@ -378,11 +381,16 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp schedulerApp = new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext); - appAttempts.put(appAttemptId, schedulerApp); + + if (application.getShouldRecover()) { + schedulerApp.recover(application.getCurrentAppAttempt()); + application.setShouldRecover(false); + } + application.setCurrentAppAttempt(schedulerApp); + metrics.submitApp(user, appAttemptId.getAttemptId()); LOG.info("Added Application Attempt " + appAttemptId - + " to scheduler from user " + application.getUser() - + ", currently active: " + appAttempts.size()); + + " to scheduler from user " + application.getUser()); rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); @@ -400,28 +408,36 @@ private synchronized void doneApplication(ApplicationId applicationId, private synchronized void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, - RMAppAttemptState rmAppAttemptFinalState) + RMAppAttemptState rmAppAttemptFinalState, boolean cleanContainers) throws IOException { - FiCaSchedulerApp application = getApplication(applicationAttemptId); - if (application == null) { + FiCaSchedulerApp attempt = getApplication(applicationAttemptId); + SchedulerApplication application = + applications.get(applicationAttemptId.getApplicationId()); + if (application == null || attempt == null) { throw new IOException("Unknown application " + applicationAttemptId + " has completed!"); } // Kill all 'live' containers - for (RMContainer container : application.getLiveContainers()) { - containerCompleted(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + for (RMContainer container : attempt.getLiveContainers()) { + if (!cleanContainers + && container.getState().equals(RMContainerState.RUNNING)) { + // do not kill the running container in the case of work-preserving AM + // restart. + continue; + } + containerCompleted(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.KILL); } // Clean up pending requests, metrics etc. - application.stop(rmAppAttemptFinalState); + attempt.stop(rmAppAttemptFinalState); - // Remove the application - appAttempts.remove(applicationAttemptId); + // Inform application whether to recover the next attempt with the running + // containers of this attempt. + application.setShouldRecover(!cleanContainers); } /** @@ -432,12 +448,13 @@ private synchronized void doneApplicationAttempt( private void assignContainers(FiCaSchedulerNode node) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + - " #applications=" + appAttempts.size()); + " #applications=" + applications.size()); // Try to assign containers to applications in fifo order - for (Map.Entry e : appAttempts + for (Map.Entry e : applications .entrySet()) { - FiCaSchedulerApp application = e.getValue(); + FiCaSchedulerApp application = + (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt(); LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { @@ -474,8 +491,10 @@ private void assignContainers(FiCaSchedulerNode node) { // Update the applications' headroom to correctly take into // account the containers assigned in this update. - for (FiCaSchedulerApp application : appAttempts.values()) { - application.setHeadroom(Resources.subtract(clusterResource, usedResource)); + for (SchedulerApplication application : applications.values()) { + FiCaSchedulerApp attempt = + (FiCaSchedulerApp) application.getCurrentAppAttempt(); + attempt.setHeadroom(Resources.subtract(clusterResource, usedResource)); } } @@ -754,7 +773,8 @@ public void handle(SchedulerEvent event) { try { doneApplicationAttempt( appAttemptRemovedEvent.getApplicationAttemptID(), - appAttemptRemovedEvent.getFinalAttemptState()); + appAttemptRemovedEvent.getFinalAttemptState(), + appAttemptRemovedEvent.getShouldCleanContainers()); } catch(IOException ie) { LOG.error("Unable to remove application " + appAttemptRemovedEvent.getApplicationAttemptID(), ie); @@ -780,12 +800,12 @@ public void handle(SchedulerEvent event) { private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - FiCaSchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = + (FiCaSchedulerApp) getCurrentAttemptForContainer(containerId); if (application == null) { - LOG.info("Unknown application: " + applicationAttemptId + - " launched container " + containerId + - " on node: " + node); + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); // Some unknown container sneaked into the system. Kill it. this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); @@ -806,14 +826,16 @@ private synchronized void containerCompleted(RMContainer rmContainer, // Get the application for the finished container Container container = rmContainer.getContainer(); - ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - FiCaSchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = + (FiCaSchedulerApp) getCurrentAttemptForContainer(container.getId()); + ApplicationId appId = + container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated FiCaSchedulerNode node = getNode(container.getNodeId()); if (application == null) { - LOG.info("Unknown application: " + applicationAttemptId + + LOG.info("Unknown application: " + appId + " released container " + container.getId() + " on node: " + node + " with event: " + event); @@ -829,7 +851,7 @@ private synchronized void containerCompleted(RMContainer rmContainer, // Update total usage Resources.subtractFrom(usedResource, container.getResource()); - LOG.info("Application " + applicationAttemptId + + LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + node + " with event: " + event); @@ -887,11 +909,23 @@ public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } - - private RMContainer getRMContainer(ContainerId containerId) { - FiCaSchedulerApp application = - getApplication(containerId.getApplicationAttemptId()); - return (application == null) ? null : application.getRMContainer(containerId); + + @Override + public RMContainer getRMContainer(ContainerId containerId) { + FiCaSchedulerApp attempt = + (FiCaSchedulerApp) getCurrentAttemptForContainer(containerId); + return (attempt == null) ? null : attempt.getRMContainer(containerId); + } + + private SchedulerApplicationAttempt getCurrentAttemptForContainer( + ContainerId containerId) { + SchedulerApplication app = + applications.get(containerId.getApplicationAttemptId() + .getApplicationId()); + if (app != null) { + return app.getCurrentAppAttempt(); + } + return null; } @Override @@ -908,12 +942,12 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, @Override public synchronized List getAppsInQueue(String queueName) { if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { - List apps = new ArrayList( - appAttempts.size()); - for (FiCaSchedulerApp app : appAttempts.values()) { - apps.add(app.getApplicationAttemptId()); + List attempts = new ArrayList( + applications.size()); + for (SchedulerApplication app : applications.values()) { + attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId()); } - return apps; + return attempts; } else { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index eb69162..1dcac06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.mortbay.log.Log; public class MockNM { @@ -130,12 +131,13 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, int containerId, ContainerState containerState) throws Exception { HashMap> nodeUpdate = new HashMap>(1); - ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus( - BuilderUtils.newContainerId(attemptId, 1), - ContainerState.COMPLETE, "Success", 0); + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + BuilderUtils.newContainerId(attemptId, containerId), containerState, + "Success", 0); ArrayList containerStatusList = new ArrayList(1); - containerStatusList.add(amContainerStatus); + containerStatusList.add(containerStatus); + Log.info("ContainerStatus: " + containerStatus); nodeUpdate.put(attemptId.getApplicationId(), containerStatusList); return nodeHeartbeat(nodeUpdate, true); } @@ -152,6 +154,7 @@ public NodeHeartbeatResponse nodeHeartbeat(Map> entry : conts.entrySet()) { + Log.info("entry.getValue() " + entry.getValue()); status.setContainersStatuses(entry.getValue()); } NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 76d8b1a..16118e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -122,6 +125,22 @@ public void waitForState(ApplicationAttemptId attemptId, attempt.getAppAttemptState()); } + public void waitForState(MockNM nm, ContainerId containerId, + RMContainerState containerState) throws Exception { + RMContainer container = getResourceScheduler().getRMContainer(containerId); + Assert.assertNotNull("Container shouldn't be null", container); + int timeoutSecs = 0; + while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) { + System.out.println("Container : " + containerId + " State is : " + + container.getState() + " Waiting for state : " + containerState); + nm.nodeHeartbeat(true); + Thread.sleep(1000); + } + System.out.println("Container State is : " + container.getState()); + Assert.assertEquals("Container state is not correct (timedout)", + containerState, container.getState()); + } + // get new application id public GetNewApplicationResponse getNewAppId() throws Exception { ApplicationClientProtocol client = getClientRMService(); @@ -166,13 +185,14 @@ public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, - maxAppAttempts, ts, appType, true); + maxAppAttempts, ts, appType, true, true); } public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType, - boolean waitForAccepted) throws Exception { + boolean waitForAccepted, boolean cleanContainersWhenFail) + throws Exception { ApplicationClientProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -182,6 +202,7 @@ public RMApp submitApp(int masterMemory, String name, String user, .newRecord(SubmitApplicationRequest.class); ApplicationSubmissionContext sub = Records .newRecord(ApplicationSubmissionContext.class); + sub.setCleanContainersWhenFail(cleanContainersWhenFail); sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java index b400e4f..baf0a7b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/QueueACLsTestBase.java @@ -221,7 +221,7 @@ private ApplicationId submitAppAndGetAppId(String submitter, ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext.newInstance(applicationId, "applicationName", queueName, null, amContainerSpec, false, true, 1, - resource, "applicationType"); + resource, "applicationType", true); appSubmissionContext.setApplicationId(applicationId); appSubmissionContext.setQueue(queueName); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index cbb70d5..77398a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -649,7 +649,7 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, .currentTimeMillis(), "YARN")); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, - rmContext, yarnScheduler, null, asContext, config); + rmContext, yarnScheduler, null, asContext, config, false); when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl); return app; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index a2bf4ae..eb28910 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -295,6 +295,8 @@ public void testNMToken() throws Exception { nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(), ContainerState.COMPLETE); } + nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); Assert.assertFalse(nmTokenSecretManager .isApplicationAttemptRegistered(attempt.getAppAttemptId())); @@ -458,7 +460,7 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception { Assert.assertEquals(-1, report1.getRpcPort()); } - private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { RMAppAttempt attempt = app.getCurrentAppAttempt(); nm.nodeHeartbeat(true); @@ -468,7 +470,7 @@ private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) return am; } - private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, + public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, MockAM am) throws Exception { FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 440bddc..5e74f8d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -112,7 +112,6 @@ public void setup() throws UnknownHostException { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); } - @SuppressWarnings("rawtypes") @Test (timeout=180000) public void testRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, @@ -713,7 +712,7 @@ public synchronized void updateApplicationAttemptStateInternal( RMApp app0 = rm1.submitApp(200, "name", "user", new HashMap(), false, "default", -1, - null, "MAPREDUCE", false); + null, "MAPREDUCE", false, true); // kill the app. rm1.killApp(app0.getApplicationId()); rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED); @@ -1406,7 +1405,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { for (int i = 0; i < NUM_APPS; i++) { RMApp app = rm1.submitApp(200, "name", "user", new HashMap(), false, - "default", -1, null, "MAPREDUCE", false); + "default", -1, null, "MAPREDUCE", false, true); appList.add(app); rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 1f9d179..6ff2b54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -164,7 +164,7 @@ public void testResourceAllocation() throws IOException, // Notify scheduler application is finished. AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( - application.getApplicationAttemptId(), RMAppAttemptState.FINISHED); + application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, true); resourceManager.getResourceScheduler().handle(appRemovedEvent1); checkResourceUsage(nm1, nm2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 8832769..2c52f1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -18,49 +18,30 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; -import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.After; -import org.junit.Before; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.Test; /** @@ -68,238 +49,112 @@ * */ public class TestAMRestart { -// private static final Log LOG = LogFactory.getLog(TestAMRestart.class); -// ApplicationsManagerImpl appImpl; -// RMContext asmContext = new RMContextImpl(new MemStore()); -// ApplicationTokenSecretManager appTokenSecretManager = -// new ApplicationTokenSecretManager(); -// DummyResourceScheduler scheduler; -// private ClientRMService clientRMService; -// int count = 0; -// ApplicationId appID; -// final int maxFailures = 3; -// AtomicInteger launchNotify = new AtomicInteger(); -// AtomicInteger schedulerNotify = new AtomicInteger(); -// volatile boolean stop = false; -// int schedulerAddApplication = 0; -// int schedulerRemoveApplication = 0; -// int launcherLaunchCalled = 0; -// int launcherCleanupCalled = 0; -// private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); -// -// private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl { -// public ExtApplicationsManagerImpl( -// ApplicationTokenSecretManager applicationTokenSecretManager, -// YarnScheduler scheduler, RMContext asmContext) { -// super(applicationTokenSecretManager, scheduler, asmContext); -// } -// -// @Override -// public EventHandler> createNewApplicationMasterLauncher( -// ApplicationTokenSecretManager tokenSecretManager) { -// return new DummyAMLauncher(); -// } -// } -// -// private class DummyAMLauncher implements EventHandler> { -// -// public DummyAMLauncher() { -// asmContext.getDispatcher().register(AMLauncherEventType.class, this); -// new Thread() { -// public void run() { -// while (!stop) { -// LOG.info("DEBUG -- waiting for launch"); -// synchronized(launchNotify) { -// while (launchNotify.get() == 0) { -// try { -// launchNotify.wait(); -// } catch (InterruptedException e) { -// } -// } -// asmContext.getDispatcher().getEventHandler().handle( -// new ApplicationEvent( -// ApplicationEventType.LAUNCHED, appID)); -// launchNotify.addAndGet(-1); -// } -// } -// } -// }.start(); -// } -// -// @Override -// public void handle(ASMEvent event) { -// switch (event.getType()) { -// case CLEANUP: -// launcherCleanupCalled++; -// break; -// case LAUNCH: -// LOG.info("DEBUG -- launching"); -// launcherLaunchCalled++; -// synchronized (launchNotify) { -// launchNotify.addAndGet(1); -// launchNotify.notify(); -// } -// break; -// default: -// break; -// } -// } -// } -// -// private class DummyResourceScheduler implements ResourceScheduler { -// -// @Override -// public void removeNode(RMNode node) { -// } -// -// @Override -// public Allocation allocate(ApplicationId applicationId, -// List ask, List release) throws IOException { -// Container container = recordFactory.newRecordInstance(Container.class); -// container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class)); -// container.setNodeId(recordFactory.newRecordInstance(NodeId.class)); -// container.setContainerManagerAddress("localhost"); -// container.setNodeHttpAddress("localhost:8042"); -// container.setId(recordFactory.newRecordInstance(ContainerId.class)); -// container.getId().setAppId(appID); -// container.getId().setId(count); -// count++; -// return new Allocation(Arrays.asList(container), Resources.none()); -// } -// -// @Override -// public void handle(ASMEvent event) { -// switch (event.getType()) { -// case ADD: -// schedulerAddApplication++; -// break; -// case EXPIRE: -// schedulerRemoveApplication++; -// LOG.info("REMOVING app : " + schedulerRemoveApplication); -// if (schedulerRemoveApplication == maxFailures) { -// synchronized (schedulerNotify) { -// schedulerNotify.addAndGet(1); -// schedulerNotify.notify(); -// } -// } -// break; -// default: -// break; -// } -// } -// -// @Override -// public QueueInfo getQueueInfo(String queueName, -// boolean includeChildQueues, -// boolean recursive) throws IOException { -// return null; -// } -// @Override -// public List getQueueUserAclInfo() { -// return null; -// } -// @Override -// public void addApplication(ApplicationId applicationId, -// ApplicationMaster master, String user, String queue, Priority priority, -// ApplicationStore store) -// throws IOException { -// } -// @Override -// public void addNode(RMNode nodeInfo) { -// } -// @Override -// public void recover(RMState state) throws Exception { -// } -// @Override -// public void reinitialize(Configuration conf, -// ContainerTokenSecretManager secretManager, RMContext rmContext) -// throws IOException { -// } -// -// @Override -// public void nodeUpdate(RMNode nodeInfo, -// Map> containers) { -// } -// -// @Override -// public Resource getMaximumResourceCapability() { -// // TODO Auto-generated method stub -// return null; -// } -// -// @Override -// public Resource getMinimumResourceCapability() { -// // TODO Auto-generated method stub -// return null; -// } -// } -// -// @Before -// public void setUp() { -// -// asmContext.getDispatcher().register(ApplicationEventType.class, -// new ResourceManager.ApplicationEventDispatcher(asmContext)); -// -// appID = recordFactory.newRecordInstance(ApplicationId.class); -// appID.setClusterTimestamp(System.currentTimeMillis()); -// appID.setId(1); -// Configuration conf = new Configuration(); -// scheduler = new DummyResourceScheduler(); -// asmContext.getDispatcher().init(conf); -// asmContext.getDispatcher().start(); -// asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler); -// appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext); -// -// conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L); -// conf.setInt(RMConfig.AM_MAX_RETRIES, maxFailures); -// appImpl.init(conf); -// appImpl.start(); -// -// this.clientRMService = new ClientRMService(asmContext, appImpl -// .getAmLivelinessMonitor(), appImpl.getClientToAMSecretManager(), -// scheduler); -// this.clientRMService.init(conf); -// } -// -// @After -// public void tearDown() { -// } -// -// private void waitForFailed(AppAttempt application, ApplicationState -// finalState) throws Exception { -// int count = 0; -// while(application.getState() != finalState && count < 10) { -// Thread.sleep(500); -// count++; -// } -// Assert.assertEquals(finalState, application.getState()); -// } -// -// @Test -// public void testAMRestart() throws Exception { -// ApplicationSubmissionContext subContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); -// subContext.setApplicationId(appID); -// subContext.setApplicationName("dummyApp"); -//// subContext.command = new ArrayList(); -//// subContext.environment = new HashMap(); -//// subContext.fsTokens = new ArrayList(); -// subContext.setFsTokensTodo(ByteBuffer.wrap(new byte[0])); -// SubmitApplicationRequest request = recordFactory -// .newRecordInstance(SubmitApplicationRequest.class); -// request.setApplicationSubmissionContext(subContext); -// clientRMService.submitApplication(request); -// AppAttempt application = asmContext.getApplications().get(appID); -// synchronized (schedulerNotify) { -// while(schedulerNotify.get() == 0) { -// schedulerNotify.wait(); -// } -// } -// Assert.assertEquals(maxFailures, launcherCleanupCalled); -// Assert.assertEquals(maxFailures, launcherLaunchCalled); -// Assert.assertEquals(maxFailures, schedulerAddApplication); -// Assert.assertEquals(maxFailures, schedulerRemoveApplication); -// Assert.assertEquals(maxFailures, application.getFailedCount()); -// waitForFailed(application, ApplicationState.FAILED); -// stop = true; -// } + + @Test + public void testAMRestartWithExistingContainers() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", -1, + null, "MAPREDUCE", false, false); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockAM am1 = TestRM.launchAM(app1, rm1, nm1); + int NUM_CONTAINERS = 3; + // allocate NUM_CONTAINERS containers + am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + + // wait for containers to be allocated. + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.size() != NUM_CONTAINERS) { + nm1.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + } + + // launch the 2nd container + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // launch the 3rd container + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING); + ContainerId containerId3 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); + + // 4th container still in AQUIRED state. + ContainerId containerId4 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 4); + rm1.waitForState(nm1, containerId4, RMContainerState.ACQUIRED); + + // fail the AM by sending CONTAINER_FINISHED event without registering. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + + // wait for some time. previous AM's running containers should still remain + // in scheduler even though am failed + Thread.sleep(3000); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // acquired container is cleaned up. + Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId4)); + + // wait for app to start a new attempt. + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + // assert this is a new AM. + ApplicationAttemptId newAttemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId())); + MockAM am2 = TestRM.launchAM(app1, rm1, nm1); + + // complete container by sending the complete event to earlier attempt. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE); + rm1.waitForState(nm1, containerId3, RMContainerState.COMPLETED); + + // Even though the completed container containerId3 event was sent to the + // earlier failed attempt, new RMAppAttempt can also capture this container + // info. + // completed containerId4 is also transferred to the new attempt. + RMAppAttempt newAttempt = + app1.getRMAppAttempt(am2.getApplicationAttemptId()); + Assert.assertEquals(2, newAttempt.getJustFinishedContainers().size()); + // containerId4 is the Acquired Container killed by the previous attempt, + // it's now inside new attempt's finished container list. + Assert.assertEquals(containerId4, newAttempt.getJustFinishedContainers() + .get(0).getContainerId()); + // containerId4 is the container ran by previous attempt but finished by the + // new attempt. + Assert.assertEquals(containerId3, newAttempt.getJustFinishedContainers() + .get(1).getContainerId()); + + // New SchedulerApplicationAttempt also has the containers info. + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // record the scheduler attempt for testing. + SchedulerApplicationAttempt schedulerNewAttempt = + ((CapacityScheduler) rm1.getResourceScheduler()) + .getCurrentAttemptForContainer(containerId2); + // finish this application + TestRM.finishApplicationMaster(app1, rm1, nm1, am2); + + // the 2nd attempt released the 1st attempt's running container, when the + // 2nd attempt finishes. + Assert.assertFalse(schedulerNewAttempt.getLiveContainers().contains( + containerId2)); + + + rm1.stop(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 5bea03b..c11edd9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -51,10 +51,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -257,7 +259,7 @@ public void setUp() throws Exception { application = mock(RMApp.class); applicationAttempt = new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler, - masterService, submissionContext, new Configuration()); + masterService, submissionContext, new Configuration(), false); when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt); when(application.getApplicationId()).thenReturn(applicationId); @@ -1092,6 +1094,44 @@ public void testGetClientToken() throws Exception { Assert.assertNull(token); } + @Test + public void testFailedToFailed() { + // create a failed attempt. + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + ContainerStatus cs1 = + ContainerStatus.newInstance(amContainer.getId(), + ContainerState.COMPLETE, "some error", 123); + ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + appAttemptId, cs1)); + sendAttemptUpdateSavedEvent(applicationAttempt); + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + + // failed attempt captured the container finished event. + assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); + ContainerStatus cs2 = + ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), + ContainerState.COMPLETE, "", 0); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + appAttemptId, cs2)); + assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); + assertEquals(cs2.getContainerId(), applicationAttempt + .getJustFinishedContainers().get(0).getContainerId()); + + // failed attempt captured the container acquired event. + NodeId nodeId = NodeId.newInstance("localhost", 1111); + assertFalse(applicationAttempt.getRanNodes().contains(nodeId)); + Container container = + Container.newInstance(cs2.getContainerId(), nodeId, null, null, null, + null); + applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( + appAttemptId, container)); + assertTrue(applicationAttempt.getRanNodes().contains(nodeId)); + } + private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId); if (UserGroupInformation.isSecurityEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3d49d86..3f670d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -597,66 +597,6 @@ public void testApplicationComparator() assertTrue(appComparator.compare(app1, app3) < 0); assertTrue(appComparator.compare(app2, app3) < 0); } - - @Test - public void testConcurrentAccessOnApplications() throws Exception { - CapacityScheduler cs = new CapacityScheduler(); - verifyConcurrentAccessOnApplications( - cs.appAttempts, FiCaSchedulerApp.class, Queue.class); - } - - public static - void verifyConcurrentAccessOnApplications( - final Map applications, Class appClazz, - final Class queueClazz) - throws Exception { - final int size = 10000; - final ApplicationId appId = ApplicationId.newInstance(0, 0); - final Constructor ctor = appClazz.getDeclaredConstructor( - ApplicationAttemptId.class, String.class, queueClazz, - ActiveUsersManager.class, RMContext.class); - - ApplicationAttemptId appAttemptId0 - = ApplicationAttemptId.newInstance(appId, 0); - applications.put(appAttemptId0, ctor.newInstance( - appAttemptId0, null, mock(queueClazz), null, null)); - assertNotNull(applications.get(appAttemptId0)); - - // Imitating the thread of scheduler that will add and remove apps - final AtomicBoolean finished = new AtomicBoolean(false); - final AtomicBoolean failed = new AtomicBoolean(false); - Thread t = new Thread() { - - @Override - public void run() { - for (int i = 1; i <= size; ++i) { - ApplicationAttemptId appAttemptId - = ApplicationAttemptId.newInstance(appId, i); - try { - applications.put(appAttemptId, ctor.newInstance( - appAttemptId, null, mock(queueClazz), null, null)); - } catch (Exception e) { - failed.set(true); - finished.set(true); - return; - } - } - for (int i = 1; i <= size; ++i) { - ApplicationAttemptId appAttemptId - = ApplicationAttemptId.newInstance(appId, i); - applications.remove(appAttemptId); - } - finished.set(true); - } - }; - t.start(); - - // Imitating the thread of rmappattempt that will get the app - while (!finished.get()) { - assertNotNull(applications.get(appAttemptId0)); - } - assertFalse(failed.get()); - } @Test public void testGetAppsInQueue() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 5e272de..ebe4b5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -345,11 +347,16 @@ public void testAppAttemptMetrics() throws Exception { .getMockApplicationAttemptId(0, 1); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null, rmContext); - a.submitApplicationAttempt(app_0, user_0); - - when(cs.getApplication(appAttemptId_0)).thenReturn(app_0); + AppAddedSchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(), + a.getQueueName(), user_0); + cs.handle(addAppEvent); + AppAttemptAddedSchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId_0); + cs.handle(addAttemptEvent); + AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent( - appAttemptId_0, RMAppAttemptState.FAILED); + appAttemptId_0, RMAppAttemptState.FAILED, true); cs.handle(event); assertEquals(0, a.getMetrics().getAppsPending()); @@ -365,9 +372,8 @@ public void testAppAttemptMetrics() throws Exception { assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(1, a.getMetrics().getAppsPending()); - when(cs.getApplication(appAttemptId_1)).thenReturn(app_0); event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0, - RMAppAttemptState.FINISHED); + RMAppAttemptState.FINISHED, true); cs.handle(event); assertEquals(1, a.getMetrics().getAppsSubmitted()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a7ad979..ce38396 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -590,7 +590,7 @@ public void testSimpleContainerReservation() throws Exception { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). getResourceUsage().getMemory()); - assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity RMNode node2 = @@ -606,10 +606,10 @@ public void testSimpleContainerReservation() throws Exception { getResourceUsage().getMemory()); // The old reservation should still be there... - assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); // ... but it should disappear when we update the first node. scheduler.handle(updateEvent); - assertEquals(0, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory()); + assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); } @@ -622,7 +622,7 @@ public void testUserAsDefaultQueue() throws Exception { ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null); + null, null, null, false, false, 0, null, null, true), null, null, 0, null); appsMap.put(appAttemptId.getApplicationId(), rmApp); AppAddedSchedulerEvent appAddedEvent = @@ -648,7 +648,7 @@ public void testNotUserAsDefaultQueue() throws Exception { ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null); + null, null, null, false, false, 0, null, null, true), null, null, 0, null); appsMap.put(appAttemptId.getApplicationId(), rmApp); AppAddedSchedulerEvent appAddedEvent = @@ -723,17 +723,17 @@ public void testQueuePlacementWithPolicy() throws Exception { scheduler.getAllocationConfiguration().placementPolicy = new QueuePlacementPolicy(rules, queues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); - assertEquals("root.somequeue", apps.get(appId).getQueueName()); + assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user1"); - assertEquals("root.user1", apps.get(appId).getQueueName()); + assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user3"); - assertEquals("root.user3group", apps.get(appId).getQueueName()); + assertEquals("root.user3group", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user4"); - assertEquals("root.user4subgroup1", apps.get(appId).getQueueName()); + assertEquals("root.user4subgroup1", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user5"); - assertEquals("root.user5subgroup2", apps.get(appId).getQueueName()); + assertEquals("root.user5subgroup2", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "otheruser"); - assertEquals("root.default", apps.get(appId).getQueueName()); + assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName()); // test without specified as first rule rules = new ArrayList(); @@ -743,11 +743,11 @@ public void testQueuePlacementWithPolicy() throws Exception { scheduler.getAllocationConfiguration().placementPolicy = new QueuePlacementPolicy(rules, queues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); - assertEquals("root.user1", apps.get(appId).getQueueName()); + assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "somequeue", "otheruser"); - assertEquals("root.somequeue", apps.get(appId).getQueueName()); + assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "otheruser"); - assertEquals("root.default", apps.get(appId).getQueueName()); + assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName()); } @Test @@ -865,7 +865,7 @@ public void testAppAdditionAndRemoval() throws Exception { .getRunnableAppSchedulables().size()); AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( - createAppAttemptId(1, 1), RMAppAttemptState.FINISHED); + createAppAttemptId(1, 1), RMAppAttemptState.FINISHED, true); // Now remove app scheduler.handle(appRemovedEvent1); @@ -1138,12 +1138,12 @@ public void testChoiceOfPreemptedContainers() throws Exception { scheduler.handle(nodeUpdate3); } - assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app3).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app6).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size()); // Now new requests arrive from queues C and D ApplicationAttemptId app7 = @@ -1166,16 +1166,16 @@ public void testChoiceOfPreemptedContainers() throws Exception { // Make sure it is lowest priority container. scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); // First verify we are adding containers to preemption list for the application - assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app3).getLiveContainers(), - scheduler.appAttempts.get(app3).getPreemptionContainers())); - assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app6).getLiveContainers(), - scheduler.appAttempts.get(app6).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(), + scheduler.getSchedulerApp(app3).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(), + scheduler.getSchedulerApp(app6).getPreemptionContainers())); // Pretend 15 seconds have passed clock.tick(15); @@ -1185,8 +1185,8 @@ public void testChoiceOfPreemptedContainers() throws Exception { Resources.createResource(2 * 1024)); // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); // Trigger a kill by insisting we want containers back scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), @@ -1200,22 +1200,22 @@ public void testChoiceOfPreemptedContainers() throws Exception { scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size()); - assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size()); - assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1374,9 +1374,9 @@ public void testMultipleContainersWaitingForReservation() throws IOException { // One container should get reservation and the other should get nothing assertEquals(1024, - scheduler.appAttempts.get(attId1).getCurrentReservation().getMemory()); + scheduler.getSchedulerApp(attId1).getCurrentReservation().getMemory()); assertEquals(0, - scheduler.appAttempts.get(attId2).getCurrentReservation().getMemory()); + scheduler.getSchedulerApp(attId2).getCurrentReservation().getMemory()); } @Test (timeout = 5000) @@ -1411,7 +1411,7 @@ public void testUserMaxRunningApps() throws Exception { scheduler.handle(updateEvent); // App 1 should be running - assertEquals(1, scheduler.appAttempts.get(attId1).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 1); @@ -1420,7 +1420,7 @@ public void testUserMaxRunningApps() throws Exception { scheduler.handle(updateEvent); // App 2 should not be running - assertEquals(0, scheduler.appAttempts.get(attId2).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); // Request another container for app 1 createSchedulingRequestExistingApplication(1024, 1, attId1); @@ -1429,7 +1429,7 @@ public void testUserMaxRunningApps() throws Exception { scheduler.handle(updateEvent); // Request should be fulfilled - assertEquals(2, scheduler.appAttempts.get(attId1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1449,10 +1449,10 @@ public void testReservationWhileMultiplePriorities() throws IOException { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.appAttempts.get(attId); + FSSchedulerApp app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); - ContainerId containerId = scheduler.appAttempts.get(attId) + ContainerId containerId = scheduler.getSchedulerApp(attId) .getLiveContainers().iterator().next().getContainerId(); // Cause reservation to be created @@ -1521,9 +1521,9 @@ public void testAclSubmitApplication() throws Exception { ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.appAttempts.get(attId1); + FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.appAttempts.get(attId2); + FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); assertNull("The application was allowed", app2); } @@ -1566,14 +1566,14 @@ public void testMultipleNodesSingleRackRequest() throws Exception { NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent1); // should assign node local - assertEquals(1, scheduler.appAttempts.get(appId).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(appId).getLiveContainers().size()); // node 2 checks in scheduler.update(); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); // should assign rack local - assertEquals(2, scheduler.appAttempts.get(appId).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(appId).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1592,8 +1592,8 @@ public void testFifoWithinQueue() throws Exception { "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.appAttempts.get(attId1); - FSSchedulerApp app2 = scheduler.appAttempts.get(attId2); + FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1633,7 +1633,7 @@ public void testMaxAssign() throws Exception { ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.appAttempts.get(attId); + FSSchedulerApp app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1695,10 +1695,10 @@ public void testAssignContainer() throws Exception { ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.appAttempts.get(attId1); - FSSchedulerApp app2 = scheduler.appAttempts.get(attId2); - FSSchedulerApp app3 = scheduler.appAttempts.get(attId3); - FSSchedulerApp app4 = scheduler.appAttempts.get(attId4); + FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1813,7 +1813,7 @@ public void testReservationThatDoesntFit() throws IOException { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.appAttempts.get(attId); + FSSchedulerApp app = scheduler.getSchedulerApp(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -1882,7 +1882,7 @@ public void testStrictLocality() throws IOException { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.appAttempts.get(attId1); + FSSchedulerApp app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1921,7 +1921,7 @@ public void testCancelStrictLocality() throws IOException { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.appAttempts.get(attId1); + FSSchedulerApp app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1954,7 +1954,7 @@ public void testReservationsStrictLocality() throws IOException { ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.appAttempts.get(attId); + FSSchedulerApp app = scheduler.getSchedulerApp(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -1994,7 +1994,7 @@ public void testNoMoreCpuOnNode() throws IOException { ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.appAttempts.get(attId); + FSSchedulerApp app = scheduler.getSchedulerApp(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2014,10 +2014,10 @@ public void testBasicDRFAssignment() throws Exception { ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1); + FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2); + FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2055,13 +2055,13 @@ public void testBasicDRFWithQueues() throws Exception { ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1); + FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2); + FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3); + FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2092,19 +2092,19 @@ public void testDRFHierarchicalQueues() throws Exception { ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1); + FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2); + FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3); + FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.appAttempts.get(appAttId4); + FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2184,7 +2184,7 @@ public void testHostPortNodeName() throws Exception { NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.appAttempts.get(attId1); + FSSchedulerApp app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2195,16 +2195,8 @@ public void testHostPortNodeName() throws Exception { assertEquals(1, app.getLiveContainers().size()); } - @Test - public void testConcurrentAccessOnApplications() throws Exception { - FairScheduler fs = new FairScheduler(); - TestCapacityScheduler.verifyConcurrentAccessOnApplications( - fs.appAttempts, FSSchedulerApp.class, FSLeafQueue.class); - } - - private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.appAttempts.get(attId); + FSSchedulerApp app = scheduler.getSchedulerApp(attId); FSLeafQueue queue = app.getQueue(); Collection runnableApps = queue.getRunnableAppSchedulables(); @@ -2260,7 +2252,7 @@ public void testUserAndQueueMaxRunningApps() throws Exception { // Remove app 1 and both app 2 and app 4 should becomes runnable in its place AppAttemptRemovedSchedulerEvent appRemovedEvent1 = - new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED); + new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, true); scheduler.handle(appRemovedEvent1); verifyAppRunnable(attId2, true); verifyQueueNumRunnable("queue2", 1, 0); @@ -2324,7 +2316,7 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { // Even though the app was removed from sub3, the app from sub2 gets to go // because it came in first AppAttemptRemovedSchedulerEvent appRemovedEvent1 = - new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED); + new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, true); scheduler.handle(appRemovedEvent1); verifyAppRunnable(attId4, true); verifyQueueNumRunnable("queue1.sub2", 2, 0); @@ -2333,7 +2325,7 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { // Now test removal of a non-runnable app AppAttemptRemovedSchedulerEvent appRemovedEvent2 = - new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED); + new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED, true); scheduler.handle(appRemovedEvent2); assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps .get("user1").size()); @@ -2341,7 +2333,7 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { verifyQueueNumRunnable("queue1.sub3", 0, 0); // verify it doesn't become runnable when there would be space for it AppAttemptRemovedSchedulerEvent appRemovedEvent3 = - new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED); + new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED, true); scheduler.handle(appRemovedEvent3); verifyQueueNumRunnable("queue1.sub2", 1, 0); verifyQueueNumRunnable("queue1.sub3", 0, 0); @@ -2389,7 +2381,7 @@ public void testContinuousScheduling() throws Exception { // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.appAttempts.get(appAttemptId); + FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -2477,7 +2469,7 @@ public void testBlacklistNodes() throws Exception { ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.appAttempts.get(appAttemptId); + FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.emptyList(), @@ -2487,7 +2479,7 @@ public void testBlacklistNodes() throws Exception { scheduler.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), null, Collections.singletonList(host)); - assertFalse(scheduler.appAttempts.get(appAttemptId).isBlacklisted(host)); + assertFalse(scheduler.getSchedulerApp(appAttemptId).isBlacklisted(host)); List update = Arrays.asList( createResourceRequest(GB, node.getHostName(), 1, 0, true)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 3057826..fc475fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -534,13 +534,6 @@ public void testFifoScheduler() throws Exception { LOG.info("--- END: testFifoScheduler ---"); } - @Test - public void testConcurrentAccessOnApplications() throws Exception { - FifoScheduler fs = new FifoScheduler(); - TestCapacityScheduler.verifyConcurrentAccessOnApplications( - fs.appAttempts, FiCaSchedulerApp.class, Queue.class); - } - @SuppressWarnings("resource") @Test public void testBlackListNodes() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index a6ad9b6..1df14c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -749,7 +749,7 @@ public void testAppSubmissionWithInvalidDelegationToken() throws Exception { ApplicationSubmissionContext.newInstance( ApplicationId.newInstance(1234121, 0), "BOGUS", "default", Priority.UNDEFINED, amContainer, false, - true, 1, Resource.newInstance(1024, 1), "BOGUS"); + true, 1, Resource.newInstance(1024, 1), "BOGUS", true); SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(appSubContext); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java index 58170ef..593759c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java @@ -1408,6 +1408,8 @@ public void testMultipleAppAttempts() throws JSONException, Exception { rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); amNodeManager.nodeHeartbeat(true); } + // kick the scheduler to allocate the am container. + amNodeManager.nodeHeartbeat(true); rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.ALLOCATED); assertEquals("incorrect number of attempts", maxAppAttempts,