diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 6070d02217c..ad64d14783c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1084,6 +1084,7 @@ private ContainerStatus createContainerStatus( NMContainerStatus remoteContainer) { ContainerStatus cStatus = ContainerStatus.newInstance(remoteContainer.getContainerId(), + remoteContainer.getExecutionType(), remoteContainer.getContainerState(), remoteContainer.getDiagnostics(), remoteContainer.getContainerExitStatus()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 901dc8a1430..ae615468ae3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -712,6 +713,86 @@ public void testContainerAutoUpdateContainer() throws Exception { response.getContainersToUpdate().get(0).getExecutionType()); } + @Test + public void testContainerStatusOnNodeReconnect() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) + .withAppName("app") + .withUser("user") + .withQueue("default") + .withUnmanagedAM(false) + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + GenericTestUtils.waitFor(() -> + amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); + + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + + // Start Container in NM + ArrayList cList = new ArrayList<>(); + for(Container c : allocatedContainers) { + nodes.get(c.getNodeId()).nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(c.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + rm.drainEvents(); + cList.add(NMContainerStatus.newInstance(c.getId(),0, + ContainerState.RUNNING, Resource.newInstance(1024, 1), + "opp container", 0, Priority.newInstance(0), 0, "", + ExecutionType.OPPORTUNISTIC, -1)); + } + + // Reconnect NM with NMContainerStatus and Application + nm2.registerNode(cList, Arrays.asList(app1.getApplicationId())); + + // Verify that container is actually running wrt the RM.. + for(Container c : allocatedContainers) { + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(c.getId().getApplicationAttemptId()) + .getRMContainer(c.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + rmContainer.getExecutionType()); + + // Container Completed in the NM + nodes.get(c.getNodeId()).nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(c.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.COMPLETE, "", 0)), true); + rm.drainEvents(); + } + } + private void verifyMetrics(QueueMetrics metrics, long availableMB, int availableVirtualCores, long allocatedMB, int allocatedVirtualCores, int allocatedContainers) {