diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index 110e9c8..5c91caf 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -36,12 +36,13 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -98,11 +99,24 @@ protected synchronized void heartbeat() throws Exception { AllocateRequest.newInstance(this.lastResponseID, super.getApplicationProgress(), new ArrayList(), new ArrayList(), null); - AllocateResponse allocateResponse; try { - allocateResponse = scheduler.allocate(allocateRequest); + scheduler.allocate(allocateRequest); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); + } catch (ApplicationNotFoundException e) { + LOG.info("Event from RM: shutting down Application Master"); + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.JOB_AM_REBOOT)); + throw new YarnRuntimeException( + "Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID(), e); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resync and send outstanding requests."); + this.lastResponseID = 0; + register(); } catch (Exception e) { // This can happen when the connection to the RM has gone down. Keep // re-trying until the retryInterval has expired. @@ -117,29 +131,6 @@ protected synchronized void heartbeat() throws Exception { // continue to attempt to contact the RM. throw e; } - if (allocateResponse.getAMCommand() != null) { - switch(allocateResponse.getAMCommand()) { - case AM_RESYNC: - LOG.info("ApplicationMaster is out of sync with ResourceManager," - + " hence resyncing."); - this.lastResponseID = 0; - register(); - break; - case AM_SHUTDOWN: - LOG.info("Event from RM: shutting down Application Master"); - // This can happen if the RM has been restarted. If it is in that state, - // this application must clean itself up. - eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + - this.getContext().getApplicationID()); - default: - String msg = - "Unhandled value of AMCommand: " + allocateResponse.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); - } - } } @SuppressWarnings("unchecked") diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 307cdfe..d2e1c3d 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -71,6 +71,8 @@ import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.Clock; @@ -239,7 +241,7 @@ public void run() { protected synchronized void heartbeat() throws Exception { scheduleStats.updateAndLogIfChanged("Before Scheduling: "); List allocatedContainers = getResources(); - if (allocatedContainers.size() > 0) { + if (allocatedContainers != null && allocatedContainers.size() > 0) { scheduledRequests.assign(allocatedContainers); } @@ -615,7 +617,7 @@ public void rampDownReduces(int rampDown) { private List getResources() throws Exception { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null - AllocateResponse response; + AllocateResponse response = null; /* * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS * milliseconds before aborting. During this interval, AM will still try @@ -625,6 +627,21 @@ public void rampDownReduces(int rampDown) { response = makeRemoteRequest(); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); + } catch (ApplicationNotFoundException e ) { + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.JOB_AM_REBOOT)); + throw new YarnRuntimeException( + "Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID(), e); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resync and send outstanding requests."); + // RM may have restarted, re-register with RM. + register(); + addOutstandingRequestOnResync(); + return null; } catch (Exception e) { // This can happen when the connection to the RM has gone down. Keep // re-trying until the retryInterval has expired. @@ -639,32 +656,6 @@ public void rampDownReduces(int rampDown) { // continue to attempt to contact the RM. throw e; } - if (response.getAMCommand() != null) { - switch(response.getAMCommand()) { - case AM_RESYNC: - LOG.info("ApplicationMaster is out of sync with ResourceManager," - + " hence resyncing."); - lastResponseID = 0; - - // Registering to allow RM to discover an active AM for this - // application - register(); - addOutstandingRequestOnResync(); - break; - case AM_SHUTDOWN: - // This can happen if the RM has been restarted. If it is in that state, - // this application must clean itself up. - eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + - this.getContext().getApplicationID()); - default: - String msg = - "Unhandled value of AMCommand: " + response.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); - } - } int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; List newContainers = response.getAllocatedContainers(); // Setting NMTokens diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 943c0af..a266e28 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -40,7 +39,6 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -51,6 +49,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import com.google.common.annotations.VisibleForTesting; + /** * Keeps the data structures to send container requests to RM. @@ -176,7 +176,8 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); } - protected AllocateResponse makeRemoteRequest() throws IOException { + protected AllocateResponse makeRemoteRequest() throws YarnException, + IOException { ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(new ArrayList(blacklistAdditions), new ArrayList(blacklistRemovals)); @@ -185,15 +186,7 @@ protected AllocateResponse makeRemoteRequest() throws IOException { super.getApplicationProgress(), new ArrayList(ask), new ArrayList(release), blacklistRequest); AllocateResponse allocateResponse; - try { - allocateResponse = scheduler.allocate(allocateRequest); - } catch (YarnException e) { - throw new IOException(e); - } - - if (isResyncCommand(allocateResponse)) { - return allocateResponse; - } + allocateResponse = scheduler.allocate(allocateRequest); lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -222,11 +215,6 @@ protected AllocateResponse makeRemoteRequest() throws IOException { return allocateResponse; } - protected boolean isResyncCommand(AllocateResponse allocateResponse) { - return allocateResponse.getAMCommand() != null - && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; - } - protected void addOutstandingRequestOnResync() { for (Map> rr : remoteRequestsTable .values()) { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e554281..15207e6 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -40,12 +40,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.mapreduce.v2.app.AppContext; -import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; -import org.apache.hadoop.mapreduce.v2.app.ControlledClock; -import org.apache.hadoop.mapreduce.v2.app.MRApp; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -56,6 +50,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; +import org.apache.hadoop.mapreduce.v2.app.ControlledClock; +import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; @@ -93,6 +91,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -112,6 +111,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -1757,14 +1757,11 @@ public void updateSchedulerProxy(MyResourceManager rm) { } @Override - protected AllocateResponse makeRemoteRequest() throws IOException { + protected AllocateResponse makeRemoteRequest() throws IOException, + YarnException { allocateResponse = super.makeRemoteRequest(); return allocateResponse; } - - public boolean isResyncCommand() { - return super.isResyncCommand(allocateResponse); - } } @Test @@ -2230,8 +2227,6 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // send allocate request to 2nd RM and get resync command allocator.schedule(); dispatcher.await(); - Assert.assertTrue("Last allocate response is not RESYNC", - allocator.isResyncCommand()); // Step-5 : On Resync,AM sends all outstanding // asks,release,blacklistAaddition diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java index 5c9a985..f21b17d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java @@ -20,7 +20,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; /** * Command sent by the Resource Manager to the Application Master in the @@ -30,16 +33,26 @@ @Public @Unstable public enum AMCommand { + /** + * @deprecated * Sent by Resource Manager when it is out of sync with the AM and wants the * AM get back in sync. + * + * Note: This is deprecated. Instead, {@link ApplicationMasterNotRegisteredException} + * will be thrown when ApplicationMaster is out of sync with ResourceManager + * and ApplicationMaster is expected to re-register with RM by calling + * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}} */ + @Deprecated AM_RESYNC, /** * Sent by Resource Manager when it wants the AM to shutdown. Eg. when the * node is going down for maintenance. The AM should save any state and * prepare to be restarted at a later time. + * */ + @Deprecated AM_SHUTDOWN } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationNotFoundException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationNotFoundException.java index da83c39..567188b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationNotFoundException.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationNotFoundException.java @@ -21,13 +21,16 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; /** * This exception is thrown on - * {@link ApplicationClientProtocol#getApplicationReport - * (GetApplicationReportRequest)} API - * when the Application doesn't exist in RM and AHS + * {@link ApplicationClientProtocol#getApplicationReport (GetApplicationReportRequest)} + * API when the application doesn't exist in RM and AHS or + * {@link ApplicationMasterProtocol#allocate(AllocateRequest)} if application + * doesn't exist in RM. */ @Public @Unstable diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index e7659bd..fd1feee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -43,6 +42,8 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -219,9 +220,13 @@ public void run() { if (!keepRunning) { return; } - + try { response = client.allocate(progress); + } catch (ApplicationNotFoundException e) { + handler.onShutdownRequest(); + LOG.info("Shutdown requested. Stopping callback."); + return; } catch (Throwable ex) { LOG.error("Exception on heartbeat", ex); savedException = ex; @@ -229,21 +234,14 @@ public void run() { handlerThread.interrupt(); return; } - } - if (response != null) { - while (true) { + if (response != null) { try { responseQueue.put(response); - if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) { - return; - } - break; } catch (InterruptedException ex) { LOG.debug("Interrupted while waiting to put on response queue", ex); } } } - try { Thread.sleep(heartbeatIntervalMs.get()); } catch (InterruptedException ex) { @@ -276,20 +274,6 @@ public void run() { LOG.info("Interrupted while waiting for queue", ex); continue; } - - if (response.getAMCommand() != null) { - switch(response.getAMCommand()) { - case AM_SHUTDOWN: - handler.onShutdownRequest(); - LOG.info("Shutdown requested. Stopping callback."); - return; - default: - String msg = - "Unhandled value of RM AMCommand: " + response.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); - } - } List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { handler.onNodesUpdated(updatedNodes); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 1db7054..51dc94b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -271,15 +270,16 @@ public AllocateResponse allocate(float progressIndicator) blacklistRemovals.clear(); } - allocateResponse = rmClient.allocate(allocateRequest); - if (isResyncCommand(allocateResponse)) { + try { + allocateResponse = rmClient.allocate(allocateRequest); + } catch (ApplicationMasterNotRegisteredException e) { LOG.warn("ApplicationMaster is out of sync with ResourceManager," + " hence resyncing."); synchronized (this) { release.addAll(this.pendingRelease); blacklistAdditions.addAll(this.blacklistedNodes); for (Map> rr : remoteRequestsTable - .values()) { + .values()) { for (Map capabalities : rr.values()) { for (ResourceRequestInfo request : capabalities.values()) { addResourceRequestToAsk(request.remoteRequest); @@ -342,11 +342,6 @@ protected void removePendingReleaseRequests( } } - private boolean isResyncCommand(AllocateResponse allocateResponse) { - return allocateResponse.getAMCommand() != null - && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; - } - @Private @VisibleForTesting protected void populateNMTokens(List nmTokens) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 15bfa28..071f82f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -765,7 +765,7 @@ public AllocateResponse createFakeAllocateResponse() { return AllocateResponse.newInstance(-1, new ArrayList(), new ArrayList(), new ArrayList(), - Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, + Resource.newInstance(1024, 2), null, 1, null, new ArrayList()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 728a558..5a591b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -18,15 +18,15 @@ package org.apache.hadoop.yarn.client.api.async.impl; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -35,13 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -55,8 +52,10 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -210,10 +209,10 @@ public void testAMRMClientAsyncShutDown() throws Exception { @SuppressWarnings("unchecked") AMRMClient client = mock(AMRMClientImpl.class); - final AllocateResponse shutDownResponse = createAllocateResponse( - new ArrayList(), new ArrayList(), null); - shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); - when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + createAllocateResponse(new ArrayList(), + new ArrayList(), null); + when(client.allocate(anyFloat())).thenThrow( + new ApplicationNotFoundException("app not found, shut down")); AMRMClientAsync asyncClient = AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index a8a56d7..74fd1a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -52,7 +52,7 @@ * License for the specific language governing permissions and limitations under * the License. */ - +@SuppressWarnings("deprecation") public class TestAllocateResponse { @Test public void testAllocateResponseWithIncDecContainers() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index e60add4..35cb045 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -66,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; @@ -106,18 +106,12 @@ RecordFactoryProvider.getRecordFactory(null); private final ConcurrentMap responseMap = new ConcurrentHashMap(); - private final AllocateResponse resync = - recordFactory.newRecordInstance(AllocateResponse.class); - private final AllocateResponse shutdown = - recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; - this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); - this.resync.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -412,36 +406,37 @@ public AllocateResponse allocate(AllocateRequest request) /* check if its in cache */ AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { - LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return shutdown; + String message = + "Application attempt " + appAttemptId + + " doesn't exist in ApplicationMasterService cache."; + LOG.error(message); + throw new ApplicationNotFoundException(message); } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is not registered for known application: " - + appAttemptId.getApplicationId() - + ". Let AM resync."; + "AM is not registered for known application attempt: " + appAttemptId + + " or RM had restarted after AM registered . AM should re-register."; LOG.info(message); RMAuditLogger.logFailure( this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) - .getUser(), AuditConstants.REGISTER_AM, "", + .getUser(), AuditConstants.AM_ALLOCATE, "", "ApplicationMasterService", message, appAttemptId.getApplicationId(), appAttemptId); - return resync; + throw new ApplicationMasterNotRegisteredException(message); } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { /* old heartbeat */ return lastResponse; } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - LOG.error("Invalid responseid from appAttemptId " + appAttemptId); - // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: - // Reboot is not useful since after AM reboots, it will send register - // and - // get an exception. Might as well throw an exception here. - return resync; + String message = + "Invalid responseId in AllocateRequest from application attempt: " + + appAttemptId + ", expect responseId to be " + + (lastResponse.getResponseId() + 1); + throw new InvalidApplicationMasterRequestException(message); } //filter illegal progress values diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index 9ae09a4..3f7ef3a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -50,6 +50,7 @@ public static final String FINISH_FAILED_APP = "Application Finished - Failed"; public static final String FINISH_KILLED_APP = "Application Finished - Killed"; public static final String REGISTER_AM = "Register App Master"; + public static final String AM_ALLOCATE = "App Master request containers"; public static final String UNREGISTER_AM = "Unregister App Master"; public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 36182f5..8c54b7b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -44,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -195,29 +196,40 @@ public void testallocateBeforeAMRegistration() throws Exception { // request for containers int request = 2; - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList()); - Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + AllocateResponse ar = null; + boolean catchException = false; + try { + ar = am.allocate("h1", 1000, request, new ArrayList()); + } catch (ApplicationMasterNotRegisteredException e) { + catchException = true; + } + Assert.assertTrue(catchException); // kick the scheduler nm1.nodeHeartbeat(true); - AllocateResponse amrs = - am.allocate(new ArrayList(), + + catchException = false; + AllocateResponse amrs = null; + try { + amrs = am.allocate(new ArrayList(), new ArrayList()); - Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + } catch (ApplicationMasterNotRegisteredException e) { + catchException = true; + } + Assert.assertTrue(catchException); am.registerAppAttempt(); - thrown = false; + catchException = false; try { - am.registerAppAttempt(false); + am.registerAppAttempt(false); } catch (Exception e) { Assert.assertEquals("Application Master is already registered : " + attempt.getAppAttemptId().getApplicationId(), e.getMessage()); - thrown = true; + catchException = true; } - Assert.assertTrue(thrown); + Assert.assertTrue(catchException); // Simulate an AM that was disconnected and app attempt was removed // (responseMap does not contain attemptid) @@ -226,9 +238,13 @@ public void testallocateBeforeAMRegistration() throws Exception { ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); - AllocateResponse amrs2 = - am.allocate(new ArrayList(), - new ArrayList()); - Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); + catchException = false; + try { + amrs = am.allocate(new ArrayList(), + new ArrayList()); + } catch (ApplicationNotFoundException e) { + catchException = true; + } + Assert.assertTrue(catchException); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 8966af7..df95be6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -51,7 +51,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service.STATE; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -60,7 +59,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -74,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -290,10 +289,14 @@ public void testRMRestart() throws Exception { // verify old AM is not accepted // change running AM to talk to new RM am1.setAMRMProtocol(rm2.getApplicationMasterService()); - AllocateResponse allocResponse = am1.allocate( - new ArrayList(), - new ArrayList()); - Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); + boolean catchException = false; + try { + am1.allocate(new ArrayList(), + new ArrayList()); + } catch (ApplicationNotFoundException e) { + catchException = true; + } + Assert.assertTrue(catchException); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -1672,8 +1675,6 @@ public void testQueueMetricsOnRMRestart() throws Exception { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - am1.setAMRMProtocol(rm2.getApplicationMasterService()); - am1.allocate(new ArrayList(), new ArrayList()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java index 0628fdd..d275d17 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java @@ -20,13 +20,11 @@ import java.security.PrivilegedExceptionAction; -import org.junit.Assert; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -35,8 +33,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mortbay.log.Log; public class TestAMRMRPCResponseId { @@ -107,7 +107,14 @@ public void testARRMResponseId() throws Exception { /** try sending old request again **/ allocateRequest = AllocateRequest.newInstance(0, 0F, null, null, null); - response = allocate(attempt.getAppAttemptId(), allocateRequest); - Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC); + + Throwable exception = null; + try { + allocate(attempt.getAppAttemptId(), allocateRequest); + } catch (Exception e) { + exception = e.getCause(); + } + Assert + .assertTrue(exception instanceof InvalidApplicationMasterRequestException); } }