diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java index 8de2c73..eccd38e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.util.Records; /** @@ -74,4 +75,21 @@ public static FinishApplicationMasterResponse newInstance( @Private @Unstable public abstract void setIsUnregistered(boolean isUnregistered); + + /** + * If the ResourceManager needs the + * ApplicationMaster to take some action then it will send an + * AMCommand to the ApplicationMaster. See AMCommand + * for details on commands and actions for them. + * @return AMCommand if the ApplicationMaster should + * take action, null otherwise + * @see org.apache.hadoop.yarn.api.records.AMCommand + */ + @Public + @Stable + public abstract AMCommand getAMCommand(); + + @Private + @Unstable + public abstract void setAMCommand(AMCommand command); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..929f7ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -57,6 +57,7 @@ message FinishApplicationMasterRequestProto { message FinishApplicationMasterResponseProto { optional bool isUnregistered = 1 [default = false]; + optional AMCommandProto a_m_command = 2; } message AllocateRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java index 1bad374..1f9d0b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java @@ -22,6 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProtoOrBuilder; @@ -89,4 +92,23 @@ public void setIsUnregistered(boolean isUnregistered) { maybeInitBuilder(); builder.setIsUnregistered(isUnregistered); } + + @Override + public synchronized AMCommand getAMCommand() { + FinishApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAMCommand()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getAMCommand()); + } + + @Override + public synchronized void setAMCommand(AMCommand command) { + maybeInitBuilder(); + if (command == null) { + builder.clearAMCommand(); + return; + } + builder.setAMCommand(ProtoUtils.convertToProtoFormat(command)); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 94dc474..161fc53 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -107,13 +107,19 @@ new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); + private final AllocateResponse shutdown = + recordFactory.newRecordInstance(AllocateResponse.class); + private final FinishApplicationMasterResponse resyncOnFinish = + recordFactory.newRecordInstance(FinishApplicationMasterResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; + this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); this.resync.setAMCommand(AMCommand.AM_RESYNC); + this.resyncOnFinish.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -338,17 +344,12 @@ public FinishApplicationMasterResponse finishApplicationMaster( if (!hasApplicationMasterRegistered(applicationAttemptId)) { String message = "Application Master is trying to unregister before registering for: " - + applicationAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps() - .get(applicationAttemptId.getApplicationId()).getUser(), - AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", - message, applicationAttemptId.getApplicationId(), - applicationAttemptId); - throw new InvalidApplicationMasterRequestException(message); + + applicationAttemptId.getApplicationId() + + ". Let AM resync"; + LOG.info(message); + return resyncOnFinish; } - + this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = @@ -409,22 +410,17 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return resync; + return shutdown; } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) - .getUser(), AuditConstants.REGISTER_AM, "", - "ApplicationMasterService", message, - appAttemptId.getApplicationId(), - appAttemptId); - throw new InvalidApplicationMasterRequestException(message); + "Application Master is not registered for known application: " + + appAttemptId.getApplicationId() + + ". Let AM resync."; + LOG.info(message); + return resync; } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 3318f15..3d7c8d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -200,12 +200,11 @@ new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) + .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_UNREGISTERED, + new FinalSavingTransition( + new AttemptUnregisteredTransition(), + RMAppState.FINISHING, RMAppState.FINISHED)) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 2a1170d..f59aeca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -287,6 +287,9 @@ RMAppAttemptEventType.KILL, new FinalSavingTransition(new FinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) + .addTransition(RMAppAttemptState.LAUNCHED, + EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED), + RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) // Transitions from RUNNING State .addTransition(RMAppAttemptState.RUNNING, @@ -908,8 +911,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } else { // Add the current attempt to the scheduler. if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { + // Need to register an app attempt before AM can register + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); + appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false)); + appAttempt.getAppAttemptId(), false, false)); } /* @@ -928,6 +935,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, * heart beat back). */ (new AMLaunchedTransition()).transition(appAttempt, event); + return RMAppAttemptState.LAUNCHED; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5de407d..6a4b595 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -557,7 +557,8 @@ private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -575,9 +576,13 @@ private synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -911,7 +916,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java index d31010d..64d308a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java @@ -24,13 +24,22 @@ private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; + private final boolean shouldNotifyAttemptAdded; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { + this(applicationAttemptId, transferStateFromPreviousAttempt, true); + } + + public AppAttemptAddedSchedulerEvent( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; + this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; } public ApplicationAttemptId getApplicationAttemptId() { @@ -40,4 +49,8 @@ public ApplicationAttemptId getApplicationAttemptId() { public boolean getTransferStateFromPreviousAttempt() { return transferStateFromPreviousAttempt; } + + public boolean getShouldNotifyAttemptAdded() { + return shouldNotifyAttemptAdded; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ea53165..c3502be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -597,7 +597,8 @@ protected synchronized void addApplication(ApplicationId applicationId, */ protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -625,9 +626,14 @@ protected synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } } /** @@ -1131,7 +1137,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4681516..7b4ebe7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -370,7 +370,8 @@ public synchronized void addApplication(ApplicationId applicationId, @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -388,9 +389,13 @@ public synchronized void addApplication(ApplicationId applicationId, metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -780,7 +785,8 @@ public void handle(SchedulerEvent event) { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index cfd05f9..f4313c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -228,7 +229,7 @@ public void unregisterAppAttempt() throws Exception { unregisterAppAttempt(req,true); } - public void unregisterAppAttempt(final FinishApplicationMasterRequest req, + public FinishApplicationMasterResponse unregisterAppAttempt(final FinishApplicationMasterRequest req, boolean waitForStateRunning) throws Exception { if (waitForStateRunning) { waitForState(RMAppAttemptState.RUNNING); @@ -239,11 +240,13 @@ public void unregisterAppAttempt(final FinishApplicationMasterRequest req, context.getRMApps().get(attemptId.getApplicationId()) .getRMAppAttempt(attemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - ugi.doAs(new PrivilegedExceptionAction() { + return ugi.doAs + (new PrivilegedExceptionAction() { @Override - public Object run() throws Exception { - amRMProtocol.finishApplicationMaster(req); - return null; + public FinishApplicationMasterResponse run() throws Exception { + FinishApplicationMasterResponse response = amRMProtocol + .finishApplicationMaster(req); + return response; } }); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..36182f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -194,28 +195,17 @@ public void testallocateBeforeAMRegistration() throws Exception { // request for containers int request = 2; - try { - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } + AllocateResponse ar = + am.allocate("h1", 1000, request, new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + // kick the scheduler nm1.nodeHeartbeat(true); - try { - AllocateResponse amrs = - am.allocate(new ArrayList(), - new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } - Assert.assertTrue(thrown); + AllocateResponse amrs = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + am.registerAppAttempt(); thrown = false; try { @@ -228,5 +218,17 @@ public void testallocateBeforeAMRegistration() throws Exception { thrown = true; } Assert.assertTrue(thrown); + + // Simulate an AM that was disconnected and app attempt was removed + // (responseMap does not contain attemptid) + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + AllocateResponse amrs2 = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index afe28aa..713c83b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.collect.Maps; +import org.apache.hadoop.yarn.api.protocolrecords + .FinishApplicationMasterResponse; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -263,20 +265,16 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.FAILED, "", ""); Throwable cause = null; - try { - am1.unregisterAppAttempt(req, false); - } catch (Exception e) { - cause = e.getCause(); - } - Assert.assertNotNull(cause); - Assert - .assertTrue(cause instanceof InvalidApplicationMasterRequestException); - Assert.assertNotNull(cause.getMessage()); - Assert - .assertTrue(cause - .getMessage() - .contains( - "Application Master is trying to unregister before registering for:")); + FinishApplicationMasterResponse response = am1.unregisterAppAttempt(req, + false); + Assert.assertEquals(AMCommand.AM_RESYNC, response.getAMCommand()); + + am1.registerAppAttempt(); + + response = am1.unregisterAppAttempt(req,false); + Assert.assertNotEquals(AMCommand.AM_RESYNC, response.getAMCommand()); + + am1.waitForState(RMAppAttemptState.FINISHING); } finally { if (rm != null) { rm.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index b7b77a7..aa7f631 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -238,7 +238,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception { } ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); - scheduler.addApplicationAttempt(attId, false); + scheduler.addApplicationAttempt(attId, false, true); rm.stop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 9c2d87e..dcd8eb6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -293,7 +293,7 @@ public void testRMRestart() throws Exception { AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); - Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC); + Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -706,6 +706,45 @@ public void testRMRestartFailedApp() throws Exception { rm2.stop(); } + @Test (timeout = 600000) + public void testRMRestartWorkPreservingAppReregister() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + am0.waitForState(RMAppAttemptState.RUNNING); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); + + am0.setAMRMProtocol(rm2.getApplicationMasterService()); + am0.registerAppAttempt(false); + + rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + rm1.stop(); + rm2.stop(); + } + @Test (timeout = 60000) public void testRMRestartKilledApp() throws Exception{ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, @@ -1648,7 +1687,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); nm1.nodeHeartbeat(true); List conts = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index fb864a2..dd1e4bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -146,7 +146,7 @@ protected ApplicationAttemptId createSchedulingRequest( // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); + scheduler.addApplicationAttempt(id, false, true); } List ask = new ArrayList(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9d8b1d1..c6a2231 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -787,13 +787,13 @@ public void testQueueDemandCalculation() throws Exception { ApplicationAttemptId id11 = createAppAttemptId(1, 1); scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false); + scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false); + scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false); + scheduler.addApplicationAttempt(id22, false, true); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -1555,7 +1555,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false); + scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -2593,7 +2593,7 @@ public void testContinuousScheduling() throws Exception { ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false); + fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);