Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (revision 1597004) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (working copy) @@ -119,6 +119,11 @@ if (allocateResponse.getAMCommand() != null) { switch(allocateResponse.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + this.lastResponseID = 0; + register(); + break; case AM_SHUTDOWN: LOG.info("Event from RM: shutting down Application Master"); // This can happen if the RM has been restarted. If it is in that state, Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (revision 1597004) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (working copy) @@ -366,6 +366,7 @@ removed = true; assignedRequests.remove(aId); containersReleased++; + pendingRelease.add(containerId); release(containerId); } } @@ -586,6 +587,17 @@ if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + if (rmWorkPreservingRestartEnabled) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + lastResponseID = 0; + + // Registering to allow RM to discover an active AM for this + // application + register(); + addOutstandingAllocateRequestOnResync(); + break; + } case AM_SHUTDOWN: // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. @@ -645,6 +657,7 @@ LOG.error("Container complete event for unknown container id " + cont.getContainerId()); } else { + pendingRelease.remove(cont.getContainerId()); assignedRequests.remove(attemptID); // send the container completed event to Task attempt @@ -932,6 +945,7 @@ private void containerNotAssigned(Container allocated) { containersReleased++; + pendingRelease.add(allocated.getId()); release(allocated.getId()); } Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (revision 1597004) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,7 +57,8 @@ private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); - private int lastResponseID; + protected int lastResponseID; + protected boolean rmWorkPreservingRestartEnabled; private Resource availableResources; private final RecordFactory recordFactory = @@ -76,7 +78,7 @@ private final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); private final Set release = new TreeSet(); - + protected Set pendingRelease = new TreeSet(); private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); @@ -131,6 +133,9 @@ @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + rmWorkPreservingRestartEnabled = + conf.getBoolean(MRJobConfig.MR_RM_WORKPRESERVING_RESTART_ENABLED, + MRJobConfig.DEFAULT_MR_RM_WORKPRESERVING_RESTART_ENABLED); nodeBlacklistingEnabled = conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled); @@ -163,6 +168,10 @@ } catch (YarnException e) { throw new IOException(e); } + + if (rmWorkPreservingRestartEnabled && isResyncCommand(allocateResponse)) { + return allocateResponse; + } lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -190,7 +199,29 @@ blacklistRemovals.clear(); return allocateResponse; } - + + private boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + + protected void addOutstandingAllocateRequestOnResync() { + for (Map> rr : remoteRequestsTable + .values()) { + for (Map capabalities : rr.values()) { + for (ResourceRequest request : capabalities.values()) { + addResourceRequestToAsk(request); + } + } + } + if (!ignoreBlacklisting.get()) { + blacklistAdditions.addAll(blacklistedNodes); + } + if (!pendingRelease.isEmpty()) { + release.addAll(pendingRelease); + } + } + // May be incorrect if there's multiple NodeManagers running on a single host. // knownNodeCount is based on node managers, not hosts. blacklisting is // currently based on hosts. Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (revision 1597004) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (working copy) @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -94,9 +95,13 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -508,6 +513,10 @@ super(conf); } + public MyResourceManager(Configuration conf, RMStateStore store) { + super(conf, store); + } + @Override public void serviceStart() throws Exception { super.serviceStart(); @@ -1603,6 +1612,10 @@ public boolean isUnregistered() { return isUnregistered; } + + public void updateSchedulerProxy(MyResourceManager rm) { + scheduler = rm.getApplicationMasterService(); + } } @Test @@ -1896,6 +1909,98 @@ Assert.assertTrue(allocator.isUnregistered()); } + @Test + public void testRMContainerOnResync() throws Exception { + + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MyResourceManager rm1 = new MyResourceManager(conf,memStore); + rm1.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm1.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm1.submitApp(1024); + dispatcher.await(); + + MockNM nm1 = + new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm1.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm1, conf, + appAttemptId, mockJob); + + // create the container request + // send MAP request + ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { + "h1", "h2" }, false, false); + allocator.sendRequest(event1); + + + // this tells the scheduler about the requests + // as nodes are not added, no allocations + List assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + + // start 2nd RM is up + MyResourceManager rm2 = new MyResourceManager(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + allocator.updateSchedulerProxy(rm2); + + // NM should be rebooted on heartbeat, even first heartbeat for nm2 + NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); + + // new NM to represent NM re-register + nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + dispatcher.await(); + + // get resyn + allocator.schedule(); + dispatcher.await(); + + // send all outstanding request again. + assigned=allocator.schedule(); + dispatcher.await(); + + nm1.nodeHeartbeat(true); + assigned=allocator.schedule(); + dispatcher.await(); + + Assert.assertEquals("Number of container should be 1", 1, assigned.size()); + for (TaskAttemptContainerAssignedEvent assig : assigned) { + Assert.assertTrue("Assigned count not correct", + "h1".equals(assig.getContainer().getNodeId().getHost())); + } + + rm1.stop(); + rm2.stop(); + + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (revision 1597004) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (working copy) @@ -532,6 +532,11 @@ public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS = MR_AM_PREFIX + "scheduler.connection.wait.interval-ms"; public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000; + + public static final String MR_RM_WORKPRESERVING_RESTART_ENABLED = + MR_AM_PREFIX + "rm.workpreservingrestart.enabled"; + public static final boolean DEFAULT_MR_RM_WORKPRESERVING_RESTART_ENABLED = + true; /** * How long to wait in milliseconds for the output committer to cancel Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (revision 1597004) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (working copy) @@ -234,8 +234,7 @@ while (true) { try { responseQueue.put(response); - if (response.getAMCommand() == AMCommand.AM_RESYNC - || response.getAMCommand() == AMCommand.AM_SHUTDOWN) { + if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) { return; } break; @@ -281,6 +280,7 @@ if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + continue; case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (revision 1597004) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (working copy) @@ -47,7 +47,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; @@ -76,11 +78,14 @@ Collections.singletonList(ResourceRequest.ANY); private int lastResponseId = 0; - + protected String appHostName; + protected int appHostPort; + protected String appTrackingUrl; protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; + protected final Set blacklistedNodes = new HashSet(); protected final Set blacklistAdditions = new HashSet(); protected final Set blacklistRemovals = new HashSet(); @@ -150,7 +155,7 @@ protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); protected final Set release = new TreeSet(); - + protected Set pendingRelease = new TreeSet(); public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); } @@ -185,6 +190,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { + this.appHostName = appHostName; + this.appHostPort = appHostPort; + this.appTrackingUrl = appTrackingUrl; Preconditions.checkArgument(appHostName != null, "The host name should not be null"); Preconditions.checkArgument(appHostPort >= -1, "Port number of the host" @@ -249,7 +257,24 @@ } allocateResponse = rmClient.allocate(allocateRequest); - + if (isResyncCommand(allocateResponse)) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + // reset lastResponseId to 0 + lastResponseId = 0; + // re register with RM + registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); + for (Map> rr : remoteRequestsTable + .values()) { + for (Map capabalities : rr.values()) { + for (ResourceRequestInfo request : capabalities.values()) { + addResourceRequestToAsk(request.remoteRequest); + } + } + } + + return allocateResponse; + } synchronized (this) { // update these on successful RPC clusterNodeCount = allocateResponse.getNumClusterNodes(); @@ -258,14 +283,19 @@ if (!allocateResponse.getNMTokens().isEmpty()) { populateNMTokens(allocateResponse.getNMTokens()); } + if (!allocateResponse.getCompletedContainersStatuses().isEmpty()) { + populatePendingReleaseRequests(allocateResponse + .getCompletedContainersStatuses()); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc - if(allocateResponse == null) { + if(allocateResponse == null || isResyncCommand(allocateResponse)) { // we hit an exception in allocate() // preserve ask and release for next call to allocate() synchronized (this) { release.addAll(releaseList); + release.addAll(this.pendingRelease); // requests could have been added or deleted during call to allocate // If requests were added/removed then there is nothing to do since // the ResourceRequest object in ask would have the actual new value. @@ -279,7 +309,7 @@ ask.add(oldAsk); } } - + blacklistAdditions.addAll(this.blacklistedNodes); blacklistAdditions.addAll(blacklistToAdd); blacklistRemovals.addAll(blacklistToRemove); } @@ -287,7 +317,19 @@ } return allocateResponse; } + + protected void populatePendingReleaseRequests( + List completedContainersStatuses) { + for (ContainerStatus containerStatus : completedContainersStatuses) { + pendingRelease.remove(containerStatus.getContainerId()); + } + } + private boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + @Private @VisibleForTesting protected void populateNMTokens(List nmTokens) { @@ -414,6 +456,7 @@ public synchronized void releaseAssignedContainer(ContainerId containerId) { Preconditions.checkArgument(containerId != null, "ContainerId can not be null."); + pendingRelease.add(containerId); release.add(containerId); } @@ -655,6 +698,7 @@ if (blacklistAdditions != null) { this.blacklistAdditions.addAll(blacklistAdditions); + this.blacklistedNodes.addAll(blacklistAdditions); // if some resources are also in blacklistRemovals updated before, we // should remove them here. this.blacklistRemovals.removeAll(blacklistAdditions); @@ -662,6 +706,7 @@ if (blacklistRemovals != null) { this.blacklistRemovals.addAll(blacklistRemovals); + this.blacklistedNodes.removeAll(blacklistRemovals); // if some resources are in blacklistAdditions before, we should remove // them here. this.blacklistAdditions.removeAll(blacklistRemovals); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (revision 1597004) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (working copy) @@ -212,7 +212,7 @@ final AllocateResponse rebootResponse = createAllocateResponse( new ArrayList(), new ArrayList(), null); - rebootResponse.setAMCommand(AMCommand.AM_RESYNC); + rebootResponse.setAMCommand(AMCommand.AM_SHUTDOWN); when(client.allocate(anyFloat())).thenReturn(rebootResponse); AMRMClientAsync asyncClient = Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (revision 1597004) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (working copy) @@ -44,7 +44,12 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -801,6 +806,158 @@ assertEquals(0, amClient.release.size()); } + @Test + public void testAMRMClientForResync() throws YarnException, IOException { + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + AMRMClient amClient = null; + + Resource capability = Resource.newInstance(1024, 1); + Priority priority = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + String[] nodes = new String[] { "node1" }; + String[] racks = new String[] { "rack1" }; + try { + // start am rm client + amClient = new MyAMRMClientImpl(); + + // setting an instance NMTokenCache + amClient.setNMTokenCache(new NMTokenCache()); + // asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("Host", 10000, ""); + assertTrue(AMRMProtocolTestImpl.isRegistered); + + // Add container request. Total ResourceRequest is 8 + amClient.addContainerRequest(new ContainerRequest(capability, nodes, + racks, priority)); + amClient.addContainerRequest(new ContainerRequest(capability, nodes, + racks, priority2)); + + // send first heartbeat + AllocateResponse allocateResponse = amClient.allocate(0.1f); + assertTrue(allocateResponse.getAMCommand() != AMCommand.AM_RESYNC); + assertEquals("Num of ask ", 8, AMRMProtocolTestImpl.numOfAsk); + + List blackListAddition = new ArrayList(); + blackListAddition.add("node2"); + amClient.updateBlacklist(blackListAddition, null); + blackListAddition.remove("node2"); + + // send second heartbeat + allocateResponse = amClient.allocate(0.1f); + assertTrue(allocateResponse.getAMCommand() != AMCommand.AM_RESYNC); + assertEquals("Num of ask ", 8, AMRMProtocolTestImpl.numOfAsk); + assertEquals("Num of blacklisted nodes ", 1, + AMRMProtocolTestImpl.numOfBlackListAddition); + + blackListAddition.add("node3"); + amClient.updateBlacklist(blackListAddition, null); + + AMRMProtocolTestImpl.sendResync = true; + // send third heartbeat and get resync command + allocateResponse = amClient.allocate(0.1f); + AMRMProtocolTestImpl.sendResync = false; + assertTrue(allocateResponse.getAMCommand() == AMCommand.AM_RESYNC); + assertEquals("Num of ask ", 0, AMRMProtocolTestImpl.numOfAsk); + assertEquals("Num of blacklisted nodes ", 0, + AMRMProtocolTestImpl.numOfBlackListAddition); + + // Addtional check ensure app is registered after resync is issued. + assertTrue(AMRMProtocolTestImpl.isRegistered); + + // send fourth heartbeat after resync. + amClient.allocate(0.1f); + assertEquals("Num of ask ", 8, AMRMProtocolTestImpl.numOfAsk); + assertEquals("Num of blacklisted nodes ", 2, + AMRMProtocolTestImpl.numOfBlackListAddition); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } + + private static class MyAMRMClientImpl extends + AMRMClientImpl { + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + rmClient = new AMRMProtocolTestImpl(); + } + + @Override + protected void serviceStop() throws Exception { + // continue + } + } + + private static class AMRMProtocolTestImpl implements + ApplicationMasterProtocol { + + private static int numOfAsk; + private static int numOfRelease; + private static int numOfBlackListAddition; + private static boolean isRegistered = false; + private static boolean sendResync = false; + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + isRegistered = true; + return RegisterApplicationMasterResponse.newInstance(null, null, null, + null, new ArrayList(), null, new ArrayList()); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return FinishApplicationMasterResponse.newInstance(true); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (sendResync) { + // for 2nd heartbeat send resync + isRegistered = false; + numOfAsk = 0; + numOfBlackListAddition = 0; + numOfRelease = 0; + return createFakeAllocateResponse(AMCommand.AM_RESYNC); + } + numOfAsk += request.getAskList().size(); + numOfRelease += request.getReleaseList().size(); + numOfBlackListAddition += + request.getResourceBlacklistRequest().getBlacklistAdditions().size(); + return createFakeAllocateResponse(null); + } + + public AllocateResponse createFakeAllocateResponse(AMCommand command) { + return AllocateResponse.newInstance(1, new ArrayList(), + new ArrayList(), new ArrayList(), + Resource.newInstance(1024, 2), command, 1, null, + new ArrayList()); + } + } + private void sleep(int sleepTime) { try { Thread.sleep(sleepTime); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (revision 1597004) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (working copy) @@ -88,7 +88,7 @@ return httpPort; } - void setResourceTrackerService(ResourceTrackerService resourceTracker) { + public void setResourceTrackerService(ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; }