diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 9b254ae9072..d28fe625d98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -372,6 +372,34 @@ public abstract void setUpdatedContainers( public void setUpdateErrors(List updateErrors) { } + /** + *

+ * Get the list of running containers as viewed by + * ResourceManager from previous application attempts which + * have not been reported to the Application Master yet. + *

+ * + * @return the list of running containers as viewed by + * ResourceManager from previous application attempts. + */ + @Public + @Unstable + public abstract List getContainersFromPreviousAttempts(); + + /** + * Set the list of running containers as viewed by + * ResourceManager from previous application attempts which have + * not been reported to the Application Master yet. + * + * @param containersFromPreviousAttempt + * the list of running containers as viewed by + * ResourceManager from previous application attempts. + */ + @Private + @Unstable + public abstract void setContainersFromPreviousAttempts( + List containersFromPreviousAttempt); + @Private @Unstable public static AllocateResponseBuilder newBuilder() { @@ -589,6 +617,22 @@ public AllocateResponseBuilder updateErrors( return this; } + /** + * Set the containersFromPreviousAttempt of the response. + * @see AllocateResponse#setContainersFromPreviousAttempts(List) + * @param containersFromPreviousAttempt + * containersFromPreviousAttempt of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder containersFromPreviousAttempt( + List containersFromPreviousAttempt) { + allocateResponse.setContainersFromPreviousAttempts( + containersFromPreviousAttempt); + return this; + } + /** * Return generated {@link AllocateResponse} object. * @return {@link AllocateResponse} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 7146b9937d6..4e97c7442d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -117,6 +117,7 @@ message AllocateResponseProto { optional CollectorInfoProto collector_info = 14; repeated UpdateContainerErrorProto update_errors = 15; repeated UpdatedContainerProto updated_containers = 16; + repeated ContainerProto containers_from_previous_attempts = 17; } enum SchedulerResourceTypes { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index ff35da81eb0..543c96fe682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -74,6 +74,7 @@ Resource limit; private List allocatedContainers = null; + private List containersFromPreviousAttempts = null; private List nmTokens = null; private List completedContainersStatuses = null; private List updatedContainers = null; @@ -447,6 +448,22 @@ public synchronized void setApplicationPriority(Priority priority) { this.appPriority = priority; } + @Override + public synchronized List getContainersFromPreviousAttempts() { + initContainersFromPreviousAttemptsList(); + return this.containersFromPreviousAttempts; + } + + @Override + public synchronized void setContainersFromPreviousAttempts( + final List containers) { + if (containers == null) + return; + initContainersFromPreviousAttemptsList(); + containersFromPreviousAttempts.clear(); + containersFromPreviousAttempts.addAll(containers); + } + private synchronized void initLocalUpdatedContainerList() { if (this.updatedContainers != null) { return; @@ -491,6 +508,19 @@ private synchronized void initLocalNewContainerList() { } } + private synchronized void initContainersFromPreviousAttemptsList() { + if (this.containersFromPreviousAttempts != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersFromPreviousAttemptsList(); + containersFromPreviousAttempts = new ArrayList<>(); + + for (ContainerProto c : list) { + containersFromPreviousAttempts.add(convertFromProtoFormat(c)); + } + } + private synchronized void initLocalNewNMTokenList() { if (nmTokens != null) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 9774a1a7b6f..d023ff97a4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -323,6 +323,9 @@ public void allocate(ApplicationAttemptId appAttemptId, // Set application priority response.setApplicationPriority(app .getApplicationPriority()); + + response.setContainersFromPreviousAttempts( + allocation.getPreviousAttemptContainers()); } private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 7308fd8b0ff..8c77d0965ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -247,6 +247,7 @@ public ClusterNodeTracker getNodeTracker() { } Collection liveContainers = app.getCurrentAppAttempt().getLiveContainers(); + app.getCurrentAppAttempt().resetPreviousAttemptContainers(); ContainerId amContainerId = null; // For UAM, amContainer would be null if (rmContext.getRMApps().get(appId).getCurrentAppAttempt() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index 43eadab88a6..fd50d25aa9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -38,6 +38,7 @@ final List decreasedContainers; final List promotedContainers; final List demotedContainers; + final List previousAttemptContainers; private Resource resourceLimit; @@ -52,7 +53,7 @@ public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources, List nmTokens) { this(containers, resourceLimit,strictContainers, fungibleContainers, - fungibleResources, nmTokens, null, null, null, null); + fungibleResources, nmTokens, null, null, null, null, null); } public Allocation(List containers, Resource resourceLimit, @@ -61,14 +62,15 @@ public Allocation(List containers, Resource resourceLimit, List increasedContainers, List decreasedContainer) { this(containers, resourceLimit,strictContainers, fungibleContainers, fungibleResources, nmTokens, increasedContainers, decreasedContainer, - null, null); + null, null, null); } public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources, List nmTokens, List increasedContainers, List decreasedContainer, - List promotedContainers, List demotedContainer) { + List promotedContainers, List demotedContainer, + List previousAttemptContainers) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; @@ -79,6 +81,7 @@ public Allocation(List containers, Resource resourceLimit, this.decreasedContainers = decreasedContainer; this.promotedContainers = promotedContainers; this.demotedContainers = demotedContainer; + this.previousAttemptContainers = previousAttemptContainers; } public List getContainers() { @@ -121,6 +124,10 @@ public Resource getResourceLimit() { return demotedContainers; } + public List getPreviousAttemptContainers() { + return previousAttemptContainers; + } + @VisibleForTesting public void setResourceLimit(Resource resource) { this.resourceLimit = resource; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 346bd2094b5..3862caa8fb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; @@ -145,6 +146,11 @@ protected List updateContainerErrors = new ArrayList<>(); + //Keeps track of recovered containers from previous attempt which haven't + //been reported to the AM. + private List recoveredPreviousAttemptContainers = + new ArrayList<>(); + // This pendingRelease is used in work-preserving recovery scenario to keep // track of the AM's outstanding release requests. RM on recovery could // receive the release request form AM before it receives the container status @@ -361,6 +367,13 @@ public void addRMContainer( ContainerId id, RMContainer rmContainer) { try { writeLock.lock(); + if (!getApplicationAttemptId().equals( + rmContainer.getApplicationAttemptId()) && + !liveContainers.containsKey(id)) { + LOG.info("recovered container " + id + + " from previous attempt " + rmContainer.getApplicationAttemptId()); + recoveredPreviousAttemptContainers.add(rmContainer); + } liveContainers.put(id, rmContainer); if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.attemptOpportunisticResourceUsage.incUsed( @@ -714,6 +727,28 @@ private void updateNMToken(Container container) { } } + void resetPreviousAttemptContainers() { + try { + writeLock.lock(); + recoveredPreviousAttemptContainers.clear(); + } finally { + writeLock.unlock(); + } + } + + public List pullPreviousAttemptContainers() { + try { + writeLock.lock(); + List returnContainerList = + recoveredPreviousAttemptContainers.stream() + .map(RMContainer::getContainer).collect(Collectors.toList()); + recoveredPreviousAttemptContainers.clear(); + return returnContainerList; + } finally { + writeLock.unlock(); + } + } + // Create container token and update NMToken altogether, if either of them fails for // some reason like DNS unavailable, do not return this container and keep it // in the newlyAllocatedContainers waiting to be refetched. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 40405fc11ae..522157c46c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -700,6 +700,8 @@ public Allocation getAllocation(ResourceCalculator resourceCalculator, ResourceRequest rr = ResourceRequest.newBuilder() .priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY) .capability(minimumAllocation).numContainers(numCont).build(); + List previousAttemptContainers = + pullPreviousAttemptContainers(); List newlyAllocatedContainers = pullNewlyAllocatedContainers(); List newlyIncreasedContainers = pullNewlyIncreasedContainers(); List newlyDecreasedContainers = pullNewlyDecreasedContainers(); @@ -711,7 +713,8 @@ public Allocation getAllocation(ResourceCalculator resourceCalculator, return new Allocation(newlyAllocatedContainers, headroom, null, currentContPreemption, Collections.singletonList(rr), updatedNMTokens, newlyIncreasedContainers, newlyDecreasedContainers, - newlyPromotedContainers, newlyDemotedContainers); + newlyPromotedContainers, newlyDemotedContainers, + previousAttemptContainers); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7f1b91e32af..7d26d6acbf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -880,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, preemptionContainerIds, null, null, application.pullUpdatedNMTokens(), null, null, application.pullNewlyPromotedContainers(), - application.pullNewlyDemotedContainers()); + application.pullNewlyDemotedContainers(), + application.pullPreviousAttemptContainers()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index c43069bac08..414fa071225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -24,6 +24,9 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -53,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -993,4 +998,131 @@ public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); rm1.stop(); } + + @Test(timeout = 200000) + public void testContainersFromPreviousAttemptsWithRMRestart() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + rm1.start(); + YarnScheduler scheduler = rm1.getResourceScheduler(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, + rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = new MockNM("127.0.0.1:2351", 4089, + rm1.getResourceTrackerService()); + nm2.registerNode(); + + RMApp app1 = rm1.submitApp(200, "name", "user", + new HashMap<>(), false, "default", -1, + null, "MAPREDUCE", false, true); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + allocateContainers(nm1, am1, 1); + allocateContainers(nm2, am1, 1); + + // container 2 launched and running on node 1 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // container 3 launched and running node 2 + nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + ContainerId containerId3 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm2, containerId3, RMContainerState.RUNNING); + + // fail the AM normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + TestSchedulerUtils.waitSchedulerApplicationAttemptStopped( + (AbstractYarnScheduler)scheduler, am1.getApplicationAttemptId()); + + // restart rm + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus container2Status = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + nm1.registerNode(Lists.newArrayList(container2Status), null); + + + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + rm2.waitForState(nm1, containerId2, RMContainerState.RUNNING); + Assert.assertNotNull(rm2.getResourceScheduler() + .getRMContainer(containerId2)); + + // wait for app to start a new attempt. + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + // assert this is a new AM. + ApplicationAttemptId newAttemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId())); + + // launch the new AM + MockAM am2 = MockRM.launchAMWhenAsyncSchedulingEnabled(app1, rm2); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + + // container2 is recovered from previous attempt + Assert.assertEquals(1, + registerResponse.getContainersFromPreviousAttempts().size()); + Assert.assertEquals("container 2", containerId2, + registerResponse.getContainersFromPreviousAttempts().get(0).getId()); + + rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + //NM2 is back + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus container3Status = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + nm2.registerNode(Lists.newArrayList(container3Status), null); + + nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + rm2.waitForState(nm2, containerId3, RMContainerState.RUNNING); + Assert.assertNotNull(rm2.getResourceScheduler() + .getRMContainer(containerId3)); + + List containersFromPreviousAttempts = new ArrayList<>(); + GenericTestUtils.waitFor(() -> { + try { + AllocateResponse allocateResponse = am2.doHeartbeat(); + if (allocateResponse.getContainersFromPreviousAttempts().size() > 0){ + containersFromPreviousAttempts.addAll( + allocateResponse.getContainersFromPreviousAttempts()); + Assert.assertEquals("new containers should not be allocated", + 0, allocateResponse.getAllocatedContainers().size()); + return true; + } + } catch (Exception e) { + Throwables.propagate(e); + } + return false; + }, 2000, 200000); + Assert.assertEquals("container 3", containerId3, + containersFromPreviousAttempts.get(0).getId()); + rm2.stop(); + rm1.stop(); + } }