diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java index 6b422b3..6fe6f27 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java @@ -24,10 +24,8 @@ /** * This exception is thrown when an ApplicationMaster asks for resources by - * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries - * to unregister by calling - * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)} - * API without first registering by calling + * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} + * without first registering by calling * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} * or if it tries to register more than once. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/UnregisteredApplicationMasterException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/UnregisteredApplicationMasterException.java new file mode 100644 index 0000000..ebf47ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/UnregisteredApplicationMasterException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; + +/** + * This exception is thrown when an Application Master tries to unregister by calling + * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)} + * API without first registering by calling + * {@link ApplicationMasterProtocol#registerApplicationMaster(org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest)} + */ + +public class UnregisteredApplicationMasterException extends YarnException { + + private static final long serialVersionUID = 13498238L; + + public UnregisteredApplicationMasterException(Throwable cause) { super(cause);} + + public UnregisteredApplicationMasterException(String message) { super(message); } + + public UnregisteredApplicationMasterException(String message, Throwable cause) { + super(message, cause); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 94dc474..14afed8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.UnregisteredApplicationMasterException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -107,12 +108,15 @@ new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); + private final AllocateResponse shutdown = + recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; + this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); this.resync.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -346,9 +350,9 @@ public FinishApplicationMasterResponse finishApplicationMaster( AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", message, applicationAttemptId.getApplicationId(), applicationAttemptId); - throw new InvalidApplicationMasterRequestException(message); + throw new UnregisteredApplicationMasterException(message); } - + this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = @@ -409,22 +413,17 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return resync; + return shutdown; } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) - .getUser(), AuditConstants.REGISTER_AM, "", - "ApplicationMasterService", message, - appAttemptId.getApplicationId(), - appAttemptId); - throw new InvalidApplicationMasterRequestException(message); + "Application Master is not registered for known application: " + + appAttemptId.getApplicationId() + + ". Let AM resync."; + LOG.info(message); + return resync; } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 3f9ef64..bc90cf9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -203,12 +203,6 @@ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -815,6 +809,16 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { + transitionImplementation(app, event, true); + } + + public void transition(RMAppImpl app, RMAppEvent event, + boolean shouldSchedulerNotifyAppAdded) { + transitionImplementation(app, event, shouldSchedulerNotifyAppAdded); + } + + private void transitionImplementation(RMAppImpl app, RMAppEvent event, + boolean shouldSchedulerNotifyAppAdded) { if (event instanceof RMAppNewSavedEvent) { RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event; // For HA this exception needs to be handled by giving up @@ -827,7 +831,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, - app.submissionContext.getQueue(), app.user)); + app.submissionContext.getQueue(), app.user, shouldSchedulerNotifyAppAdded)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 5e71c93..4ad7572 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -884,8 +884,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } else { // Add the current attempt to the scheduler. if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { + // Need to register an app attempt before AM can register + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); + appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false)); + appAttempt.getAppAttemptId(), false, false)); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5de407d..8e1e8fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -519,7 +519,8 @@ synchronized CSQueue getQueue(String queueName) { } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, + boolean shouldNotifyAppAccepted) { // santiy checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -551,13 +552,21 @@ private synchronized void addApplication(ApplicationId applicationId, applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + + if (shouldNotifyAppAccepted) { + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying APP_ACCEPTED"); + } + } } private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -575,9 +584,15 @@ private synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -896,7 +911,8 @@ public void handle(SchedulerEvent event) { { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getShouldNotifyAppAccepted()); } break; case APP_REMOVED: @@ -911,7 +927,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index d6fb36d..7b257e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -25,13 +25,20 @@ private final ApplicationId applicationId; private final String queue; private final String user; + private boolean shouldNotifyAppAdded; public AppAddedSchedulerEvent( ApplicationId applicationId, String queue, String user) { + this(applicationId, queue, user, true); + } + public AppAddedSchedulerEvent( + ApplicationId applicationId, String queue, String user, + boolean shouldNotifyAppAdded) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; + this.shouldNotifyAppAdded = shouldNotifyAppAdded; } public ApplicationId getApplicationId() { @@ -46,4 +53,9 @@ public String getUser() { return user; } + // On recovery we don't need to notify APP_ACCEPTED again and this will be + // false + public boolean getShouldNotifyAppAccepted() { + return shouldNotifyAppAdded; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java index d31010d..64d308a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java @@ -24,13 +24,22 @@ private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; + private final boolean shouldNotifyAttemptAdded; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { + this(applicationAttemptId, transferStateFromPreviousAttempt, true); + } + + public AppAttemptAddedSchedulerEvent( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; + this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; } public ApplicationAttemptId getApplicationAttemptId() { @@ -40,4 +49,8 @@ public ApplicationAttemptId getApplicationAttemptId() { public boolean getTransferStateFromPreviousAttempt() { return transferStateFromPreviousAttempt; } + + public boolean getShouldNotifyAttemptAdded() { + return shouldNotifyAttemptAdded; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ea53165..e6165fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -551,7 +551,8 @@ public FairSchedulerEventLog getEventLog() { * configured limits, but the app will not be marked as runnable. */ protected synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, + boolean shouldNotifyAppAccepted) { if (queueName == null || queueName.isEmpty()) { String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; @@ -588,8 +589,14 @@ protected synchronized void addApplication(ApplicationId applicationId, LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + if (shouldNotifyAppAccepted) { + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying APP_ACCEPTED"); + } + } } /** @@ -597,7 +604,8 @@ protected synchronized void addApplication(ApplicationId applicationId, */ protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -625,9 +633,16 @@ protected synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } /** @@ -1114,7 +1129,8 @@ public void handle(SchedulerEvent event) { } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getShouldNotifyAppAccepted()); break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { @@ -1131,7 +1147,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4681516..d526069 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -356,21 +356,29 @@ private FiCaSchedulerNode getNode(NodeId nodeId) { @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, - String queue, String user) { + String queue, String user, boolean + shouldNotifyAppAccepted) { SchedulerApplication application = new SchedulerApplication(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + if (shouldNotifyAppAccepted) { + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying APP_ACCEPTED"); + } + } } @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -388,9 +396,15 @@ public synchronized void addApplication(ApplicationId applicationId, metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -765,7 +779,8 @@ public void handle(SchedulerEvent event) { { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getShouldNotifyAppAccepted()); } break; case APP_REMOVED: @@ -780,7 +795,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..36182f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -194,28 +195,17 @@ public void testallocateBeforeAMRegistration() throws Exception { // request for containers int request = 2; - try { - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } + AllocateResponse ar = + am.allocate("h1", 1000, request, new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + // kick the scheduler nm1.nodeHeartbeat(true); - try { - AllocateResponse amrs = - am.allocate(new ArrayList(), - new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } - Assert.assertTrue(thrown); + AllocateResponse amrs = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + am.registerAppAttempt(); thrown = false; try { @@ -228,5 +218,17 @@ public void testallocateBeforeAMRegistration() throws Exception { thrown = true; } Assert.assertTrue(thrown); + + // Simulate an AM that was disconnected and app attempt was removed + // (responseMap does not contain attemptid) + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + AllocateResponse amrs2 = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index afe28aa..f023ec0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.collect.Maps; +import org.apache.hadoop.yarn.exceptions.UnregisteredApplicationMasterException; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -270,13 +271,17 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { } Assert.assertNotNull(cause); Assert - .assertTrue(cause instanceof InvalidApplicationMasterRequestException); + .assertTrue(cause instanceof UnregisteredApplicationMasterException); Assert.assertNotNull(cause.getMessage()); Assert .assertTrue(cause .getMessage() .contains( "Application Master is trying to unregister before registering for:")); + + am1.registerAppAttempt(); + + am1.unregisterAppAttempt(req, false); } finally { if (rm != null) { rm.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index b7b77a7..b8b4133 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -228,7 +228,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception { scheduler.handle(new NodeAddedSchedulerEvent(node)); ApplicationId appId = ApplicationId.newInstance(0, 1); - scheduler.addApplication(appId, "queue1", "user1"); + scheduler.addApplication(appId, "queue1", "user1", true); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); try { @@ -238,7 +238,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception { } ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); - scheduler.addApplicationAttempt(attId, false); + scheduler.addApplicationAttempt(attId, false, true); rm.stop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 8eed4e6..0e730b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -292,7 +292,7 @@ public void testRMRestart() throws Exception { AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); - Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC); + Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -941,7 +941,7 @@ protected RMAppManager createRMAppManager() { rm2.stop(); } - private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { RMAppAttempt attempt = app.getCurrentAppAttempt(); nm.nodeHeartbeat(true); @@ -1647,7 +1647,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); nm1.nodeHeartbeat(true); List conts = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 6dd2992..a79218f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -535,6 +535,41 @@ public void testContainersNotRecoveredForCompletedApps() throws Exception { assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); } + @Test (timeout = 600000) + public void testRMRestartWorkPreservingAppReregister() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = TestRMRestart.launchAM(app0, rm1, nm1); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); + + am0.setAMRMProtocol(rm2.getApplicationMasterService()); + am0.registerAppAttempt(false); + + rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + rm1.stop(); + rm2.stop(); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index fb864a2..023ed87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -142,11 +142,11 @@ protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); + scheduler.addApplication(id.getApplicationId(), queueId, userId, true); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); + scheduler.addApplicationAttempt(id, false, true); } List ask = new ArrayList(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9d8b1d1..32cb0bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -786,14 +786,16 @@ public void testQueueDemandCalculation() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", true); + scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false); + scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", + true); + scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false); + scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", + true); + scheduler.addApplicationAttempt(id22, false, true); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -1554,8 +1556,8 @@ public void testMultipleNodesSingleRackRequest() throws Exception { scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false); + scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", true); + scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -1801,7 +1803,7 @@ public void testNotAllowSubmitApplication() throws Exception { ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplication(attId.getApplicationId(), queue, user); + scheduler.addApplication(attId.getApplicationId(), queue, user, true); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -2592,8 +2594,9 @@ public void testContinuousScheduling() throws Exception { // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false); + fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", + true); + fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);