diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 69a704e..150bf8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -172,6 +172,10 @@ public void allocate(ApplicationAttemptId appAttemptId, ((AbstractYarnScheduler)rmContext.getScheduler()) .getApplicationAttempt(appAttemptId); + if (!appAttempt.getApplicationAttemptId().equals(appAttemptId)){ + return; + } + OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index b2be1e7..996ed73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -81,10 +81,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -750,6 +753,44 @@ private void verifyMetrics(QueueMetrics metrics, long availableMB, } @Test(timeout = 60000) + public void testAMCrashDuringAllocate() throws Exception { + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId0 = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + //simulate AM crash by replacing the current attempt + //Do not use rm.failApplicationAttempt, the bug will skip due to + //ApplicationDoesNotExistInCacheException + CapacityScheduler scheduler= ((CapacityScheduler) rm.getRMContext(). + getScheduler()); + SchedulerApplication schApp = + (SchedulerApplication)scheduler. + getSchedulerApplications().get(attemptId0.getApplicationId()); + final ApplicationAttemptId appAttemptId1 = TestUtils. + getMockApplicationAttemptId(1, 1); + schApp.setCurrentAppAttempt(new FiCaSchedulerApp(appAttemptId1, + null, scheduler.getQueue("default"), null, rm.getRMContext())); + + //start to allocate + final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null); + AllocateRequest allReq = + (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class); + allReq.setAskList(Arrays.asList( + ResourceRequest.newInstance(Priority.UNDEFINED, "a", + Resource.newInstance(1, 2), 1, true, "exp", + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true)))); + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2)), null); + } + + @Test(timeout = 60000) public void testNodeRemovalDuringAllocate() throws Exception { MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());