diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e4a8a1a..5e69f40 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -2516,7 +2516,7 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() RMApp app = rm1.submitApp(1024); rm1.drainEvents(); - MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat rm1.drainEvents(); @@ -2606,7 +2606,7 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register - nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + nm1 = new MockNM("h1:1234", 10240, rm2); nm1.registerNode(); nm1.nodeHeartbeat(true); rm2.drainEvents(); @@ -2757,7 +2757,7 @@ public void testRMUnavailable() RMApp app = rm1.submitApp(1024); rm1.drainEvents(); - MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1); nm1.registerNode(); nm1.nodeHeartbeat(true); rm1.drainEvents(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index fa3c6af..1a06e08 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -129,7 +129,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { RMApp app = rm1.submitApp(1024); rm1.drainEvents(); - MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat rm1.drainEvents(); @@ -260,7 +260,7 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register - nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + nm1 = new MockNM("h1:1234", 10240, rm2); NMContainerStatus containerReport = NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING, Resource.newInstance(1024, 1), "recover container", 0, @@ -358,7 +358,7 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception { RMApp app = rm1.submitApp(1024); rm1.drainEvents(); - MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat rm1.drainEvents(); @@ -392,7 +392,7 @@ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception { Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register - nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + nm1 = new MockNM("h1:1234", 10240, rm2); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); NMContainerStatus containerReport = @@ -435,7 +435,7 @@ public void testAMRMClientOnAMRMTokenRollOverOnRMRestart() throws Exception { RMApp app = rm1.submitApp(1024); rm1.drainEvents(); - MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat rm1.drainEvents(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 5333f25..76c79ee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -221,7 +221,7 @@ protected static void setClusterTimeStamp(long timestamp) { } @VisibleForTesting - Dispatcher getRmDispatcher() { + public Dispatcher getRmDispatcher() { return rmDispatcher; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index cc47e02..9a70ee8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -507,12 +507,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); - if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse - .getResponseId()) { + if (getNextResponseId( + remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse + .getResponseId()) { LOG.info("Received duplicate heartbeat from node " + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); return lastNodeHeartbeatResponse; - } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse + } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse .getResponseId()) { String message = "Too far behind rm response id:" @@ -548,10 +549,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } // Heartbeat response - NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils - .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. - getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); + NodeHeartbeatResponse nodeHeartBeatResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse( + getNextResponseId(lastNodeHeartbeatResponse.getResponseId()), + NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); rmNode.updateNodeHeartbeatResponseForUpdatedContainers( nodeHeartBeatResponse); @@ -613,6 +614,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + private int getNextResponseId(int responseId) { + // Loop between 0 and Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + private void setAppCollectorsMapToResponse( List runningApps, NodeHeartbeatResponse response) { Map liveAppCollectorsMap = new diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 4a8ff00..af9eac7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -55,6 +55,7 @@ private NodeId nodeId; private long memory; private int vCores; + private MockRM rm; private ResourceTrackerService resourceTracker; private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; @@ -65,24 +66,24 @@ private Map registeringCollectors = new ConcurrentHashMap<>(); - public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { + public MockNM(String nodeIdStr, int memory, MockRM rm) { // scale vcores based on the requested memory this(nodeIdStr, memory, Math.max(1, (memory * YarnConfiguration.DEFAULT_NM_VCORES) / YarnConfiguration.DEFAULT_NM_PMEM_MB), - resourceTracker); + rm); } - public MockNM(String nodeIdStr, int memory, int vcores, - ResourceTrackerService resourceTracker) { - this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion()); + public MockNM(String nodeIdStr, int memory, int vcores, MockRM rm) { + this(nodeIdStr, memory, vcores, rm, YarnVersionInfo.getVersion()); } - public MockNM(String nodeIdStr, int memory, int vcores, - ResourceTrackerService resourceTracker, String version) { + public MockNM(String nodeIdStr, int memory, int vcores, MockRM rm, + String version) { this.memory = memory; this.vCores = vcores; - this.resourceTracker = resourceTracker; + this.rm = rm; + this.resourceTracker = rm.getResourceTrackerService(); this.version = version; String[] splits = nodeIdStr.split(":"); nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); @@ -118,7 +119,7 @@ public void containerIncreaseStatus(Container container) throws Exception { container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, ++responseId); + true, responseId); } public void addRegisteringCollector(ApplicationId appId, @@ -172,12 +173,13 @@ public RegisterNodeManagerResponse registerNode( } } } + responseId = 0; return registrationResponse; } public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.emptyList(), - Collections.emptyList(), isHealthy, ++responseId); + Collections.emptyList(), isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -190,12 +192,12 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, containerStatusList.add(containerStatus); Log.getLog().info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.emptyList(), true, ++responseId); + Collections.emptyList(), true, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { - return nodeHeartbeat(conts, isHealthy, ++responseId); + return nodeHeartbeat(conts, isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, boolean isHealthy) throws Exception { return nodeHeartbeat(updatedStats, Collections.emptyList(), - isHealthy, ++responseId); + isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, @@ -247,7 +249,9 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); - + rm.drainEventsImplicitly(); + responseId = heartbeatResponse.getResponseId(); + MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey(); if (masterKeyFromRM != null && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey @@ -282,4 +286,8 @@ public int getvCores() { public String getVersion() { return version; } + + public void setResponseId(int id) { + this.responseId = id; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 1235774..f7adf20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -815,7 +815,7 @@ public SubmitApplicationResponse run() throws IOException, YarnException { } public MockNM registerNode(String nodeIdStr, int memory) throws Exception { - MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); + MockNM nm = new MockNM(nodeIdStr, memory, this); nm.registerNode(); drainEventsImplicitly(); return nm; @@ -823,8 +823,7 @@ public MockNM registerNode(String nodeIdStr, int memory) throws Exception { public MockNM registerNode(String nodeIdStr, int memory, int vCores) throws Exception { - MockNM nm = - new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); + MockNM nm = new MockNM(nodeIdStr, memory, vCores, this); nm.registerNode(); drainEventsImplicitly(); return nm; @@ -832,9 +831,8 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) public MockNM registerNode(String nodeIdStr, int memory, int vCores, List runningApplications) throws Exception { - MockNM nm = - new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), - YarnVersionInfo.getVersion()); + MockNM nm = new MockNM(nodeIdStr, memory, vCores, this, + YarnVersionInfo.getVersion()); nm.registerNode(runningApplications); drainEventsImplicitly(); return nm; @@ -1262,7 +1260,7 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId) LOG.info("app is removed from scheduler, " + appId); } - private void drainEventsImplicitly() { + public void drainEventsImplicitly() { if (!disableDrainEventsImplicitly) { drainEvents(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index ebca7a3..4e970d0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -293,8 +293,7 @@ public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception { // start RM MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -327,11 +326,9 @@ public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception { // start RM MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 1024, rm1); nm1.registerNode(); - MockNM nm2 = - new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.1:5678", 1024, rm1); nm2.registerNode(); // create app and launch the AM @@ -384,8 +381,7 @@ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws // start RM MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -420,8 +416,7 @@ public void testAppCleanupWhenNMReconnects() throws Exception { // start RM MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -461,8 +456,7 @@ public void testProcessingNMContainerStatusesOnNMRestart() throws Exception { int nmMemory = 8192; int amMemory = 1024; int containerMemory = 2048; - MockNM nm1 = - new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", nmMemory, rm1); nm1.registerNode(); RMApp app0 = rm1.submitApp(amMemory); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java index ba9de6c..ed196bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java @@ -73,9 +73,8 @@ public void tearDown() { public void testUsageWithOneAttemptAndOneContainer() throws Exception { MockRM rm = new MockRM(conf); rm.start(); - - MockNM nm = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + + MockNM nm = new MockNM("127.0.0.1:1234", 15120, rm); nm.registerNode(); RMApp app0 = rm.submitApp(200); @@ -141,8 +140,7 @@ public void testUsageWithMultipleContainersAndRMRestart() throws Exception { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); MockRM rm0 = new MockRM(conf); rm0.start(); - MockNM nm = - new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 65536, rm0); nm.registerNode(); RMApp app0 = rm0.submitApp(200); @@ -264,8 +262,7 @@ private void amRestartTests(boolean keepRunningContainers) rm.submitApp(200, "name", "user", new HashMap(), false, "default", -1, null, "MAPREDUCE", false, keepRunningContainers); - MockNM nm = - new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 10240, rm); nm.registerNode(); MockAM am0 = MockRM.launchAndRegisterAM(app, rm, nm); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java index 149ddf5..9869186 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestKillApplicationWithRMHA.java @@ -56,8 +56,7 @@ public void testKillAppWhenFailoverHappensAtNewState() // the RMAppState will always be NEW. // The ApplicationState will not be saved in RMStateStore. startRMsWithCustomizedRMAppManager(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // Submit the application @@ -87,8 +86,7 @@ public void testKillAppWhenFailoverHappensAtNewState() public void testKillAppWhenFailoverHappensAtRunningState() throws Exception { startRMs(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, - rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -110,8 +108,7 @@ public void testKillAppWhenFailoverHappensAtRunningState() public void testKillAppWhenFailoverHappensAtFinalState() throws Exception { startRMs(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, - rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -141,8 +138,7 @@ public void testKillAppWhenFailOverHappensDuringApplicationKill() // When receives the killApplicationRequest, simply return the response // and make sure the application will not be KILLED State startRMsWithCustomizedClientRMService(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, - rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index 5266210..d8782fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -69,12 +69,10 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { // Register 5 nodes, so that we can blacklist atleast one if AM container // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1. // First register 2 nodes, and after AM lauched register 3 more nodes. - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm); nm1.registerNode(); - MockNM nm2 = - new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:2345", 8000, rm); nm2.registerNode(); RMApp app = rm.submitApp(200); @@ -95,16 +93,13 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { } // register 3 nodes now - MockNM nm3 = - new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService()); + MockNM nm3 = new MockNM("127.0.0.3:2345", 8000, rm); nm3.registerNode(); - MockNM nm4 = - new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService()); + MockNM nm4 = new MockNM("127.0.0.4:2345", 8000, rm); nm4.registerNode(); - MockNM nm5 = - new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService()); + MockNM nm5 = new MockNM("127.0.0.5:2345", 8000, rm); nm5.registerNode(); // Set the exist status to INVALID so that we can verify that the system @@ -171,24 +166,19 @@ public void testNodeBlacklistingOnAMFailureStrictNodeLocality() // Register 5 nodes, so that we can blacklist atleast one if AM container // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1. - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm); nm1.registerNode(); - MockNM nm2 = - new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:2345", 8000, rm); nm2.registerNode(); - MockNM nm3 = - new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService()); + MockNM nm3 = new MockNM("127.0.0.3:2345", 8000, rm); nm3.registerNode(); - MockNM nm4 = - new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService()); + MockNM nm4 = new MockNM("127.0.0.4:2345", 8000, rm); nm4.registerNode(); - MockNM nm5 = - new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService()); + MockNM nm5 = new MockNM("127.0.0.5:2345", 8000, rm); nm5.registerNode(); // Specify a strict locality on nm2 @@ -258,24 +248,19 @@ public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality() // Register 5 nodes, so that we can blacklist atleast one if AM container // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1. - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm); nm1.registerNode(); - MockNM nm2 = - new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:2345", 8000, rm); nm2.registerNode(); - MockNM nm3 = - new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService()); + MockNM nm3 = new MockNM("127.0.0.3:2345", 8000, rm); nm3.registerNode(); - MockNM nm4 = - new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService()); + MockNM nm4 = new MockNM("127.0.0.4:2345", 8000, rm); nm4.registerNode(); - MockNM nm5 = - new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService()); + MockNM nm5 = new MockNM("127.0.0.5:2345", 8000, rm); nm5.registerNode(); // Specify a relaxed locality on nm2 @@ -349,8 +334,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { MockRM rm = startRM(conf); - MockNM node = - new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); + MockNM node = new MockNM("127.0.0.1:1234", 8000, rm); node.registerNode(); RMApp app = rm.submitApp(200); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 9b9eb3c..4024a5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -161,13 +161,13 @@ public void stopRM() { @Test(timeout = 600000) public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception { HashMap nodes = new HashMap<>(); - MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 4096, rm); nodes.put(nm1.getNodeId(), nm1); - MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("h1:4321", 4096, rm); nodes.put(nm2.getNodeId(), nm2); - MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + MockNM nm3 = new MockNM("h2:1234", 4096, rm); nodes.put(nm3.getNodeId(), nm3); - MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService()); + MockNM nm4 = new MockNM("h2:4321", 4096, rm); nodes.put(nm4.getNodeId(), nm4); nm1.registerNode(); nm2.registerNode(); @@ -333,9 +333,9 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception @Test(timeout = 60000) public void testContainerPromoteAfterContainerStart() throws Exception { HashMap nodes = new HashMap<>(); - MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 4096, rm); nodes.put(nm1.getNodeId(), nm1); - MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:1234", 4096, rm); nodes.put(nm2.getNodeId(), nm2); nm1.registerNode(); nm2.registerNode(); @@ -458,9 +458,9 @@ public void testContainerPromoteAfterContainerStart() throws Exception { @Test(timeout = 600000) public void testContainerPromoteAfterContainerComplete() throws Exception { HashMap nodes = new HashMap<>(); - MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 4096, rm); nodes.put(nm1.getNodeId(), nm1); - MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:1234", 4096, rm); nodes.put(nm2.getNodeId(), nm2); nm1.registerNode(); nm2.registerNode(); @@ -575,7 +575,7 @@ public void testContainerPromoteAfterContainerComplete() throws Exception { public void testContainerAutoUpdateContainer() throws Exception { rm.stop(); createAndStartRMWithAutoUpdateContainer(); - MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 4096, rm); nm1.registerNode(); OpportunisticContainerAllocatorAMService amservice = @@ -736,8 +736,8 @@ private void verifyMetrics(QueueMetrics metrics, long availableMB, @Test(timeout = 60000) public void testNodeRemovalDuringAllocate() throws Exception { - MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); - MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 4096, rm); + MockNM nm2 = new MockNM("h2:1234", 4096, rm); nm1.registerNode(); nm2.registerNode(); OpportunisticContainerAllocatorAMService amservice = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 39313d0..aa62829 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -453,8 +453,8 @@ public void testActivatingApplicationAfterAddingNM() throws Exception { ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId(); rm1.waitForState(attemptId2, RMAppAttemptState.SCHEDULED); - MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); - MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1); + MockNM nm2 = new MockNM("h2:5678", 15120, rm1); nm1.registerNode(); nm2.registerNode(); @@ -484,8 +484,7 @@ public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception { // a succeeded app RMApp app1 = rm1.submitApp(200); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1); @@ -532,8 +531,7 @@ public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception { public void testInvalidatedAMHostPortOnAMRestart() throws Exception { MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // a failed app @@ -598,9 +596,8 @@ protected Dispatcher createDispatcher() { int appsSubmitted = metrics.getAppsSubmitted(); rm.start(); - - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); // a failed app @@ -679,8 +676,7 @@ protected Dispatcher createDispatcher() { } }; rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -733,8 +729,7 @@ protected Dispatcher createDispatcher() { } }; rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java index fa0d318..19552ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java @@ -61,9 +61,9 @@ public void setup() throws Exception { public void testRebuildCollectorDataOnFailover() throws Exception { startRMs(); MockNM nm1 - = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + = new MockNM("127.0.0.1:1234", 15120, rm2); MockNM nm2 - = new MockNM("127.0.0.1:5678", 15121, rm2.getResourceTrackerService()); + = new MockNM("127.0.0.1:5678", 15121, rm2); RMApp app1 = rm1.submitApp(1024); String collectorAddr1 = "1.2.3.4:5"; AppCollectorData data1 = AppCollectorData.newInstance( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 5cbcdbc..66e358c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -200,11 +200,9 @@ public void testRMRestart() throws Exception { // start like normal because state is empty rm1.start(); - - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); - MockNM nm2 = - new MockNM("127.0.0.2:5678", 15120, rm1.getResourceTrackerService()); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); + MockNM nm2 = new MockNM("127.0.0.2:5678", 15120, rm1); nm1.registerNode(); nm2.registerNode(); // nm2 will not heartbeat with RM1 @@ -357,8 +355,8 @@ public void testRMRestart() throws Exception { Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register - nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); - nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService()); + nm1 = new MockNM("127.0.0.1:1234", 15120, rm2); + nm2 = new MockNM("127.0.0.2:5678", 15120, rm2); NMContainerStatus status = TestRMRestart @@ -461,7 +459,7 @@ public void testRMRestartAppRunningAMFailed() throws Exception { memStore.getState().getApplicationState(); rm1.start(); MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -518,8 +516,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { rm1.start(); AbstractYarnScheduler ys = (AbstractYarnScheduler)rm1.getResourceScheduler(); - MockNM nm1 = - new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 16382, rm1); nm1.registerNode(); // submitting app @@ -733,8 +730,7 @@ public void testRMRestartFailedApp() throws Exception { memStore.getState().getApplicationState(); // start RM rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -779,8 +775,7 @@ public void testRMRestartKilledApp() throws Exception{ memStore.getState().getApplicationState(); // start RM rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -868,8 +863,7 @@ public void testRMRestartSucceededApp() throws Exception { // start like normal because state is empty rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create an app and finish the app. @@ -916,8 +910,7 @@ protected SystemMetricsPublisher createSystemMetricsPublisher() { }; rms.add(rm1); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // a succeeded app. @@ -1075,8 +1068,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { memStore.getState().getApplicationState(); // start RM rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // submit an app with maxAppAttempts equals to 1 @@ -1145,8 +1137,7 @@ public void testRMRestartTimelineCollectorContext() throws Exception { MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); Map rmAppState = memStore.getState().getApplicationState(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // submit an app. @@ -1302,8 +1293,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { memStore.getState().getApplicationState(); // start RM rm1.start(); - MockNM nm1 = - new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("0.0.0.0:4321", 15120, rm1); nm1.registerNode(); // submit an app @@ -1621,8 +1611,7 @@ public void testFinishedAppRemovalAfterRMRestart() throws Exception { rm1.start(); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); RMState rmState = memStore.getState(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create an app and finish the app. @@ -1689,8 +1678,7 @@ public void testClientRetryOnKillingApplication() throws Exception { // start RM MockRM rm1 = createMockRM(conf, memStore); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); RMApp app1 = @@ -1819,8 +1807,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { // start RM MockRM rm1 = createMockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics(); resetQueueMetrics(qm1); @@ -1866,7 +1853,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); nm1.nodeHeartbeat(true); - nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm1 = new MockNM("127.0.0.1:1234", 15120, rm2); NMContainerStatus status = TestRMRestart @@ -2018,8 +2005,7 @@ public void testSynchronouslyRenewDTOnRecovery() throws Exception { // start RM MockRM rm1 = createMockRM(conf); rm1.start(); - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); RMApp app0 = rm1.submitApp(200); final MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); @@ -2272,8 +2258,7 @@ public void testRMRestartFailAppAttempt() throws Exception { memStore.getState().getApplicationState(); // start RM rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -2399,8 +2384,7 @@ public void testRMRestartAfterPreemption() throws Exception { rm1.start(); CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); int CONTAINER_MEMORY = 1024; // create app and launch the AM @@ -2453,8 +2437,7 @@ public void testRMRestartOnMissingAttempts() throws Exception { MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); // start RM rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create an app and finish the app. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 5ed3278..e1c4c33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -198,7 +198,6 @@ public void testDecommissionWithExcludeHosts() throws Exception { Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); - rm.drainEvents(); // To test that IPs also work String ip = NetUtils.normalizeHostName("localhost"); @@ -217,14 +216,12 @@ public void testDecommissionWithExcludeHosts() throws Exception { nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - rm.drainEvents(); writeToHostsFile(""); rm.getNodesListManager().refreshNodes(conf); nm3 = rm.registerNode("localhost:4433", 1024); nodeHeartbeat = nm3.nodeHeartbeat(true); - rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); // decommissined node is 1 since 1 node is rejoined after updating exclude // file @@ -726,7 +723,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { nodeHeartbeatResponse = resourceTrackerService.nodeHeartbeat(heartbeatReq); Assert.assertEquals("InValid Node Labels were not accepted by RM", - NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction()); + NodeAction.RESYNC, nodeHeartbeatResponse.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), oldLabels); Assert.assertFalse("Node Labels should not accepted by RM", @@ -912,7 +909,7 @@ public void testSetRMIdentifierInRegistration() throws Exception { rm = new MockRM(conf); rm.start(); - MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + MockNM nm = new MockNM("host1:1234", 5120, rm); RegisterNodeManagerResponse response = nm.registerNode(); // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse @@ -1220,7 +1217,6 @@ public void handle(SchedulerEvent event) { MockNM nm2 = rm.registerNode("host2:5678", 5120); nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(false); - rm.drainEvents(); checkUnhealthyNMCount(rm, nm2, true, 1); final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs(); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); @@ -1231,7 +1227,6 @@ public void handle(SchedulerEvent event) { nm1 = rm.registerNode("host1:1234", 5120); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); - rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); checkUnhealthyNMCount(rm, nm2, true, 1); @@ -1239,21 +1234,17 @@ public void handle(SchedulerEvent event) { nm2 = rm.registerNode("host2:5678", 5120); response = nm2.nodeHeartbeat(false); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); - rm.drainEvents(); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); checkUnhealthyNMCount(rm, nm2, true, 1); // unhealthy node changed back to healthy nm2 = rm.registerNode("host2:5678", 5120); response = nm2.nodeHeartbeat(true); - response = nm2.nodeHeartbeat(true); - rm.drainEvents(); Assert.assertEquals(5120 + 5120, metrics.getAvailableMB()); // reconnect of node with changed capability nm1 = rm.registerNode("host2:5678", 10240); response = nm1.nodeHeartbeat(true); - rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); @@ -1262,22 +1253,18 @@ public void handle(SchedulerEvent event) { runningApps.add(ApplicationId.newInstance(1, 0)); nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps); response = nm1.nodeHeartbeat(true); - rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); // reconnect healthy node changing http port - nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + nm1 = new MockNM("host1:1234", 5120, rm); nm1.setHttpPort(3); nm1.registerNode(); response = nm1.nodeHeartbeat(true); - response = nm1.nodeHeartbeat(true); - rm.drainEvents(); RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); Assert.assertEquals(3, rmNode.getHttpPort()); Assert.assertEquals(5120, rmNode.getTotalCapability().getMemorySize()); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); - } @Test @@ -1353,7 +1340,7 @@ public void testInvalidNMUnregistration() throws Exception { // 2. Exclude the same Node Manager host // 3. Give NM heartbeat to RM // 4. Unregister the Node Manager - MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService); + MockNM nm1 = new MockNM("host1:1234", 5120, rm); RegisterNodeManagerResponse response = nm1.registerNode(); Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); @@ -1372,7 +1359,7 @@ public void testInvalidNMUnregistration() throws Exception { // 1. Register the Node Manager // 2. Exclude the same Node Manager host // 3. Unregister the Node Manager - MockNM nm2 = new MockNM("host2:1234", 5120, resourceTrackerService); + MockNM nm2 = new MockNM("host2:1234", 5120, rm); RegisterNodeManagerResponse response2 = nm2.registerNode(); Assert.assertEquals(NodeAction.NORMAL, response2.getNodeAction()); writeToHostsFile("host1"); @@ -1421,7 +1408,6 @@ public void testInitDecommMetricHelper(boolean hasIncludeList) nm2 = rm1.registerNode("host2:5678", 10240); nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); - rm1.drainEvents(); Assert.assertEquals("Number of Decommissioned nodes should be 1", 1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); Assert.assertEquals("The inactiveRMNodes should contain an entry for the" + @@ -1434,7 +1420,6 @@ public void testInitDecommMetricHelper(boolean hasIncludeList) nm1 = rm1.registerNode("host1:1234", 5120); nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); - rm1.drainEvents(); Assert.assertEquals("The decommissioned nodes metric should have " + "decremented to 0", 0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); @@ -1495,7 +1480,6 @@ public void testIncorrectRecommission() throws Exception { rm.getNodesListManager().refreshNodesGracefully(conf, null); rm.drainEvents(); nm1.nodeHeartbeat(true); - rm.drainEvents(); Assert.assertTrue("Node " + nm1.getNodeId().getHost() + " should be Decommissioned", rm.getRMContext() .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState @@ -1564,7 +1548,6 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); - rm.drainEvents(); Assert.assertEquals("All 3 nodes should be active", metrics.getNumActiveNMs(), 3); @@ -1579,7 +1562,6 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { } nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); - rm.drainEvents(); Assert.assertTrue("Node should not be in active node list", !rmContext.getRMNodes().containsKey(nm2.getNodeId())); @@ -1612,13 +1594,11 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { writeToHostsFile("host1", ip, "host2"); refreshNodesOption(doGraceful, conf); nm2 = rm.registerNode("host2:5678", 10240); - rm.drainEvents(); writeToHostsFile("host1", ip); refreshNodesOption(doGraceful, conf); rm.waitForState(nm2.getNodeId(), doGraceful? NodeState.DECOMMISSIONING : NodeState.SHUTDOWN); nm2.nodeHeartbeat(true); - rm.drainEvents(); rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); Assert.assertEquals("Node should be shutdown", rmNode.getState(), @@ -1634,7 +1614,6 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { refreshNodesOption(doGraceful, conf); nm2 = rm.registerNode("host2:5678", 10240); nodeHeartbeat = nm2.nodeHeartbeat(true); - rm.drainEvents(); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertEquals("Shutdown nodes should be 0 now", metrics.getNumShutdownNMs(), 0); @@ -1740,7 +1719,6 @@ private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = clusterMetrics; assert (metrics != null); - rm.drainEvents(); //check all 3 nodes joined in as NORMAL NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); @@ -1748,7 +1726,6 @@ private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); - rm.drainEvents(); Assert.assertEquals("All 3 nodes should be active", metrics.getNumActiveNMs(), 3); int waitCount = 0; @@ -1781,7 +1758,6 @@ private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { refreshNodesOption(doGraceful, conf); nm1.nodeHeartbeat(true); nm3.nodeHeartbeat(true); - rm.drainEvents(); waitCount = 0; while(rmContext.getInactiveRMNodes().get( nm2.getNodeId()) != null && waitCount++ < 2){ @@ -1826,8 +1802,6 @@ private void testNodeRemovalUtilRebooted(boolean doGraceful) assert (metrics != null); NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat( new HashMap>(), true, -100); - rm.drainEvents(); - rm.drainEvents(); Assert.assertNotEquals("host2 should be a rebooted NM!", rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); @@ -1853,7 +1827,6 @@ private void testNodeRemovalUtilRebooted(boolean doGraceful) nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); nm3.nodeHeartbeat(true); - rm.drainEvents(); int waitCount = 0; while(rmContext.getInactiveRMNodes().get( nm2.getNodeId()) != null && waitCount++ < 2){ @@ -1894,7 +1867,6 @@ private void testNodeRemovalUtilUnhealthy(boolean doGraceful) ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = clusterMetrics; assert (metrics != null); - rm.drainEvents(); //check all 3 nodes joined in as NORMAL NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); @@ -1902,7 +1874,6 @@ private void testNodeRemovalUtilUnhealthy(boolean doGraceful) Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); - rm.drainEvents(); Assert.assertEquals("All 3 nodes should be active", metrics.getNumActiveNMs(), 3); // node healthy @@ -1916,7 +1887,6 @@ private void testNodeRemovalUtilUnhealthy(boolean doGraceful) nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(false); nm3.nodeHeartbeat(true); - rm.drainEvents(); if (!doGraceful) { Assert.assertNotEquals("host2 should be a shutdown NM!", rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null); @@ -2035,7 +2005,6 @@ public void testNodeHeartBeatResponseForUnknownContainerCleanUp() rm.start(); MockNM nm1 = rm.registerNode("host1:1234", 5120); - rm.drainEvents(); // send 1st heartbeat nm1.nodeHeartbeat(true); @@ -2064,12 +2033,10 @@ public void testNodeHeartBeatResponseForUnknownContainerCleanUp() // Send unknown container status in heartbeat nm1.nodeHeartbeat(conts, true); - rm.drainEvents(); int containersToBeRemovedFromNM = 0; while (true) { NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); - rm.drainEvents(); containersToBeRemovedFromNM += nodeHeartbeat.getContainersToBeRemovedFromNM().size(); // asserting for 2 since two unknown containers status has been sent @@ -2078,4 +2045,31 @@ public void testNodeHeartBeatResponseForUnknownContainerCleanUp() } } } + + @Test + public void testResponseIdOverflow() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + + // prepare the responseId that's about to overflow + RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE); + + nm1.setResponseId(Integer.MAX_VALUE); + + // heartbeat twice and check responseId + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(0, nodeHeartbeat.getResponseId()); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(1, nodeHeartbeat.getResponseId()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 2c37f44..7b7c02d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -150,8 +150,7 @@ public void testSchedulerRecovery() throws Exception { rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -296,8 +295,7 @@ public void testDynamicQueueRecovery() throws Exception { rm1 = new MockRM(schedulerConf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); // 2. Run plan follower to update the added node & then submit app to // dynamic queue. @@ -568,8 +566,7 @@ public void testRMRestartWithRemovedQueue() throws Exception{ conf.set(YarnConfiguration.YARN_ADMIN_ACL, ""); rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); final RMApp app1 = rm1.submitApp(1024, "app1", USER_1, null); MockAM am1 = MockRM.launchAndRegisterAM(app1,rm1, nm1); @@ -618,10 +615,8 @@ public void testCapacitySchedulerRecovery() throws Exception { setupQueueConfiguration(csConf); rm1 = new MockRM(csConf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); - MockNM nm2 = - new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); + MockNM nm2 = new MockNM("127.1.1.1:4321", 8192, rm1); nm1.registerNode(); nm2.registerNode(); RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A); @@ -777,8 +772,7 @@ public void testCapacityLeafQueueBecomesParentOnRecovery() throws Exception { setupQueueConfiguration(csConf); rm1 = new MockRM(csConf); rm1.start(); - MockNM nm = - new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); + MockNM nm = new MockNM("127.1.1.1:4321", 8192, rm1); nm.registerNode(); // Submit an app to QueueB. @@ -822,10 +816,8 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception { setupQueueConfiguration(csConf); rm1 = new MockRM(csConf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); - MockNM nm2 = - new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); + MockNM nm2 = new MockNM("127.1.1.1:4321", 8192, rm1); nm1.registerNode(); nm2.registerNode(); RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A); @@ -876,8 +868,7 @@ public void testAMfailedBetweenRMRestart() throws Exception { conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -909,8 +900,7 @@ public void testAMfailedBetweenRMRestart() throws Exception { rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); - MockNM nm2 = - new MockNM("127.1.1.1:4321", 8192, rm2.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.1.1.1:4321", 8192, rm2); NMContainerStatus previousAttemptContainer = TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 4, ContainerState.RUNNING); @@ -927,8 +917,7 @@ public void testAMfailedBetweenRMRestart() throws Exception { public void testContainersNotRecoveredForCompletedApps() throws Exception { rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -965,8 +954,7 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { // start RM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -994,8 +982,7 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { public void testAMContainerStatusWithRMRestart() throws Exception { rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1_1 = rm1.submitApp(1024); MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1); @@ -1028,8 +1015,7 @@ public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { // start RM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -1058,7 +1044,7 @@ public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { @Test (timeout = 50000) public void testReleasedContainerNotRecovered() throws Exception { rm1 = new MockRM(conf); - MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1); nm1.registerNode(); rm1.start(); @@ -1155,8 +1141,7 @@ public void testNewContainersNotAllocatedDuringSchedulerRecovery() YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000); rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -1208,8 +1193,7 @@ public void testRetriedFinishApplicationMasterRequest() // start RM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -1241,8 +1225,7 @@ public void testAppFailedToRenewTokenOnRecovery() throws Exception { UserGroupInformation.setConfiguration(conf); MockRM rm1 = new TestSecurityMockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -1286,8 +1269,7 @@ public void addApplicationSync(ApplicationId applicationId, public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{ rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -1307,8 +1289,7 @@ public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Ex rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); // submit app with keepContainersAcrossApplicationAttempts true @@ -1378,7 +1359,7 @@ public void testAppStateSavedButAttemptStateNotSaved() throws Exception { rm1 = new MockRM(conf, memStore); rm1.start(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); @@ -1410,8 +1391,7 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { // start RM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the UAM diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 528afac..bd986e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -72,11 +72,9 @@ public void testAMRestartWithExistingContainers() throws Exception { rm1.submitApp(200, "name", "user", new HashMap(), false, "default", -1, null, "MAPREDUCE", false, true); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, rm1); nm1.registerNode(); - MockNM nm2 = - new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.1:2351", 4089, rm1); nm2.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -276,11 +274,9 @@ public void testNMTokensRebindOnAMRestart() throws Exception { rm1.submitApp(200, "myname", "myuser", new HashMap(), false, "default", -1, null, "MAPREDUCE", false, true); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1); nm1.registerNode(); - MockNM nm2 = - new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.1.1.1:4321", 8000, rm1); nm2.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -382,8 +378,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); @@ -452,8 +447,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { RMAppAttempt attempt4 = app1.getCurrentAppAttempt(); // create second NM, and register to rm1 - MockNM nm2 = - new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.1:2234", 8000, rm1); nm2.registerNode(); // nm1 heartbeats to report unhealthy // This will mimic ContainerExitStatus.ABORT @@ -509,8 +503,7 @@ public void testMaxAttemptOneMeansOne() throws Exception { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); @@ -549,8 +542,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { MockRM rm1 = new MockRM(conf); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); @@ -632,8 +624,7 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() rm1.start(); CapacityScheduler scheduler = (CapacityScheduler) rm1.getResourceScheduler(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(200); // AM should be restarted even though max-am-attempt is 1. @@ -704,8 +695,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { MockRM rm1 = new MockRM(conf); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1); nm1.registerNode(); // set window size to a larger number : 60s @@ -845,8 +835,7 @@ public void testAMRestartNotLostContainerCompleteMsg() throws Exception { rm1.submitApp(200, "name", "user", new HashMap(), false, "default", -1, null, "MAPREDUCE", false, true); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, rm1); nm1.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -940,8 +929,7 @@ public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval() MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1); nm1.registerNode(); // set window size to 10s and enable keepContainers diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index 6a7325c..c35fc5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -204,8 +204,7 @@ public void testDecommissioningNodeReconnect() throws Exception { MockRM rm = new MockRM(); rm.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); @@ -214,8 +213,7 @@ public void testDecommissioningNodeReconnect() RMNodeEventType.GRACEFUL_DECOMMISSION)); rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); - MockNM nm2 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.1:1234", 15120, rm); RegisterNodeManagerResponse response = nm2.registerNode(); // not SHUTDOWN Assert.assertTrue(response.getNodeAction().equals(NodeAction.NORMAL)); @@ -230,8 +228,7 @@ public void testRMNodeStatusAfterReconnect() throws Exception { MockRM rm = new MockRM(); rm.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); int i = 0; while(i < 3) { @@ -240,8 +237,7 @@ public void testRMNodeStatusAfterReconnect() throws Exception { i++; } - MockNM nm2 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.1:1234", 15120, rm); nm2.registerNode(); RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); nm2.nodeHeartbeat(true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java index f7e76bb..bb491b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -167,8 +167,7 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { MockRM rm1 = new MockRM(conf); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); nm1.nodeHeartbeat(true); @@ -251,8 +250,7 @@ public synchronized void updateApplicationStateInternal( memStore.init(conf); rm1 = new MockRM(conf, memStore); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); nm1.nodeHeartbeat(true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 60b9e4b..b04b51a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -385,8 +385,7 @@ public void testReleasedContainerIfAppAttemptisNull() throws Exception { MockRM rm1 = new MockRM(conf); try { rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, rm1); nm1.registerNode(); AbstractYarnScheduler scheduler = @@ -434,8 +433,7 @@ public void testContainerReleasedByNode() throws Exception { rm1.submitApp(200, "name", "user", new HashMap(), false, "default", -1, null, "Test", false, true); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, rm1); nm1.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -547,12 +545,10 @@ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() rm1.submitApp(200, "name", "user", new HashMap(), false, "default", -1, null, "Test", false, true); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, rm1); nm1.registerNode(); - MockNM nm2 = - new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.1:2351", 10240, rm1); nm2.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -635,9 +631,8 @@ public void testResourceRequestRecoveryToTheRightAppAttempt() RMApp rmApp = rm.submitApp(200, "name", "user", new HashMap(), false, "default", -1, - null, "Test", false, true); - MockNM node = - new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService()); + null, "Test", false, true); + MockNM node = new MockNM("127.0.0.1:1234", 10240, rm); node.registerNode(); MockAM am1 = MockRM.launchAndRegisterAM(rmApp, rm, node); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java index e60fd6f..dfac22e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java @@ -175,13 +175,11 @@ public void testMultipleAppsWithAllocationReqId() throws Exception { // Register node1 String host0 = "host_0"; String host1 = "host_1"; - MockNM nm1 = - new MockNM(host0 + ":1234", 8 * GB, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM(host0 + ":1234", 8 * GB, rm); nm1.registerNode(); // Register node2 - MockNM nm2 = - new MockNM(host1 + ":2351", 8 * GB, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM(host1 + ":2351", 8 * GB, rm); nm2.registerNode(); // submit 1st app diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index cad0151..eb5e22d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -409,8 +409,7 @@ public void testRMRestartWithChangeInPriority() throws Exception { MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, - rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); Priority appPriority1 = Priority.newInstance(5); @@ -608,7 +607,7 @@ public void testOrderOfActivatingThePriorityApplicationOnRMRestart() rm1.start(); MockNM nm1 = - new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); + new MockNM("127.0.0.1:1234", 16384, rm1); nm1.registerNode(); rm1.drainEvents(); @@ -753,8 +752,7 @@ public void testUpdatePriorityOnPendingAppAndKillAttempt() throws Exception { killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app1); // Check ordering policy size when resource is added - MockNM nm1 = - new MockNM("127.0.0.1:1234", 8096, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8096, rm); nm1.registerNode(); RMApp app2 = rm.submitApp(1024, Priority.newInstance(appPriority)); Assert.assertEquals("Pending apps should be 0", 0, appsPending.size()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index a526222..63289a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1618,8 +1618,7 @@ public void testPreemptionInfo() throws Exception { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); // start NM - MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1); nm1.registerNode(); // create app and launch the AM @@ -3217,13 +3216,13 @@ public void testSchedulerKeyGarbageCollection() throws Exception { rm.start(); HashMap nodes = new HashMap<>(); - MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 4096, rm); nodes.put(nm1.getNodeId(), nm1); - MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:1234", 4096, rm); nodes.put(nm2.getNodeId(), nm2); - MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + MockNM nm3 = new MockNM("h3:1234", 4096, rm); nodes.put(nm3.getNodeId(), nm3); - MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + MockNM nm4 = new MockNM("h4:1234", 4096, rm); nodes.put(nm4.getNodeId(), nm4); nm1.registerNode(); nm2.registerNode(); @@ -3377,8 +3376,7 @@ public void testHierarchyQueuesCurrentLimits() throws Exception { conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 100 * GB, rm1); nm1.registerNode(); RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); @@ -3457,8 +3455,7 @@ public void testParentQueueMaxCapsAreRespected() throws Exception { MockRM rm1 = new MockRM(conf); rm1.start(); - MockNM nm1 = - new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 24 * GB, rm1); nm1.registerNode(); // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB @@ -3508,11 +3505,11 @@ protected RMNodeLabelsManager createNodeLabelManager() { rm.start(); MockNM nm1 = // label = x - new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + new MockNM("h1:1234", 200 * GB, rm); nm1.registerNode(); MockNM nm2 = // label = "" - new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + new MockNM("h2:1234", 200 * GB, rm); nm2.registerNode(); // Launch app1 in queue=a1 @@ -3917,7 +3914,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { rm.start(); MockNM nm1 = // label = "" - new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + new MockNM("h1:1234", 200 * GB, rm); nm1.registerNode(); // Launch app1 in queue=a1 @@ -4593,8 +4590,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { }; rm.start(); - MockNM nm1 = - new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 200 * GB, rm); nm1.registerNode(); // Launch app1 in queue=a1 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef33..25cd904 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -1842,8 +1842,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - MockNM nm1 = - new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("h1:1234", 24 * GB, rm); nm1.registerNode(); // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 941c215..0a2400c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -5118,8 +5118,7 @@ public void testRefreshQueuesWhenRMHA() throws Exception { MemoryRMStateStore memStore = (MemoryRMStateStore) rm2.getRMStateStore(); rm2.start(); - MockNM nm = - new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 15120, rm2); nm.registerNode(); rm2.getAdminService().transitionToActive(requestInfo); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index c708b92..1b55cc5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -964,8 +964,7 @@ public Boolean get() { }, 1000, 20000); // check nm can retrieve the token - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); ByteBuffer tokenBuffer = @@ -1057,8 +1056,7 @@ protected void renewToken(final DelegationTokenToRenew dttr) rm2.start(); // check nm can retrieve the token - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm2); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); ByteBuffer tokenBuffer = @@ -1112,8 +1110,7 @@ public Boolean get() { }, 1000, 20000); // check nm can retrieve the token - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); ByteBuffer tokenBuffer = @@ -1133,8 +1130,7 @@ public Boolean get() { public void testAppSubmissionWithPreviousToken() throws Exception{ MockRM rm = new TestSecurityMockRM(conf, null); rm.start(); - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); // create Token1: @@ -1195,8 +1191,7 @@ public void testFSLeakInObtainSystemTokensForUser() throws Exception{ public void testCancelWithMultipleAppSubmissions() throws Exception{ MockRM rm = new TestSecurityMockRM(conf, null); rm.start(); - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); // create Token1: @@ -1310,8 +1305,7 @@ public void testRenewTokenUsingTokenConfProvidedByApp() throws Exception{ MockRM rm = new TestSecurityMockRM(conf, null); rm.start(); - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); // create a token @@ -1365,8 +1359,7 @@ public void testTokensConfExceedLimit() throws Exception { conf.setInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, 100); MockRM rm = new TestSecurityMockRM(conf, null); rm.start(); - final MockNM nm1 = - new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + final MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm); nm1.registerNode(); // create a token diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java index 1e61186..2965075 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -58,8 +58,7 @@ public void testAssignMultipleContainersPerNodeHeartbeat() //Start RM so that it accepts app submissions rm.start(); - MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, - rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, rm); nm.registerNode(); try { @@ -116,8 +115,7 @@ public void testAssignWithoutAvailableResource() throws Exception { //Start RM so that it accepts app submissions rm.start(); - MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, - rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, rm); nm.registerNode(); try { @@ -199,8 +197,7 @@ public void testWrongNodeId() throws Exception { //Start RM so that it accepts app submissions rm.start(); - MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, - rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, rm); nm.registerNode(); try { @@ -248,10 +245,8 @@ public void testReserveNewContainer() throws Exception { //Start RM so that it accepts app submissions rm.start(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, - rm.getResourceTrackerService()); - MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, - rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, rm); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, rm); nm1.registerNode(); nm2.registerNode(); @@ -370,8 +365,7 @@ public void testActivityJSON() throws Exception { //Start RM so that it accepts app submissions rm.start(); - MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, - rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, rm); nm.registerNode(); try { @@ -518,8 +512,7 @@ public void testAppActivityJSON() throws Exception { //Start RM so that it accepts app submissions rm.start(); - MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, - rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, rm); nm.registerNode(); try { @@ -564,8 +557,7 @@ public void testAppAssignMultipleContainersPerNodeHeartbeat() //Start RM so that it accepts app submissions rm.start(); - MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, - rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, rm); nm.registerNode(); try { @@ -618,8 +610,7 @@ public void testAppAssignWithoutAvailableResource() throws Exception { //Start RM so that it accepts app submissions rm.start(); - MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, - rm.getResourceTrackerService()); + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, rm); nm.registerNode(); try { @@ -700,10 +691,8 @@ public void testAppReserveNewContainer() throws Exception { //Start RM so that it accepts app submissions rm.start(); - MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, - rm.getResourceTrackerService()); - MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, - rm.getResourceTrackerService()); + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, rm); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, rm); nm1.registerNode(); nm2.registerNode(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index de282fd..8543a1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -378,7 +379,7 @@ public synchronized void restartResourceManager(int index) resourceManagers[index].stop(); resourceManagers[index] = null; } - resourceManagers[index] = new ResourceManager(); + resourceManagers[index] = createResourceManager(); initResourceManager(index, getConfig()); startResourceManager(index); } @@ -442,6 +443,14 @@ public ResourceManager getResourceManager(int i) { return this.resourceManagers[i]; } + public void drainResourceManagerDispatcher() { + ResourceManager rm = getResourceManager(); + if (rm == null) { + return; + } + ((DrainDispatcher) rm.getRmDispatcher()).await(); + } + public NodeManager getNodeManager(int i) { return this.nodeManagers[i]; } @@ -621,9 +630,11 @@ protected synchronized void serviceStop() throws Exception { public class CustomNodeManager extends NodeManager { protected NodeStatus nodeStatus; + protected int lastResponseId = -1; public void setNodeStatus(NodeStatus status) { this.nodeStatus = status; + this.nodeStatus.setResponseId(lastResponseId); } /** @@ -633,11 +644,13 @@ public void setNodeStatus(NodeStatus status) { */ protected NodeStatus getSimulatedNodeStatus(NodeStatus currentStatus) { if(nodeStatus == null) { + lastResponseId = currentStatus.getResponseId(); return currentStatus; } else { // Increment response ID, the RMNodeStatusEvent will not get recorded // for a duplicate heartbeat nodeStatus.setResponseId(nodeStatus.getResponseId() + 1); + lastResponseId = nodeStatus.getResponseId(); return nodeStatus; } } @@ -658,6 +671,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, // Allow simulation of nodestatus @Override protected NodeStatus getNodeStatus(int responseId) throws IOException { + // Drain RM dispatch before the next NM heartbeat + drainResourceManagerDispatcher(); return getSimulatedNodeStatus(super.getNodeStatus(responseId)); } }; @@ -676,6 +691,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, // Allow simulation of nodestatus @Override protected NodeStatus getNodeStatus(int responseId) throws IOException { + // Drain RM dispatch before the next NM heartbeat + drainResourceManagerDispatcher(); return getSimulatedNodeStatus(super.getNodeStatus(responseId)); } @@ -845,6 +862,11 @@ protected ResourceManager createResourceManager() { protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcases. } + + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java index a941302..27025a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; @@ -32,9 +31,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster.CustomNodeManager; -import org.apache.hadoop.yarn.server.api.ResourceTracker; -import org.apache.hadoop.yarn.server.api.ServerRMProxy; -import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; @@ -87,53 +83,19 @@ public void setup() { assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); nm = (CustomNodeManager)cluster.getNodeManager(0); - int responseId = 1; - nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId, + nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0, CONTAINER_PMEM_1, CONTAINER_VMEM_1, CONTAINER_CPU_1, NODE_PMEM_1, NODE_VMEM_1, NODE_CPU_1); nm.setNodeStatus(nodeStatus); } /** - * Simulates a NM heartbeat using the simulated NodeStatus fixture. Verify - * both the RMNode and SchedulerNode have been updated with the new - * utilization. - */ - @Test(timeout=60000) - public void testUpdateNodeUtilization() - throws InterruptedException, IOException, YarnException { - assertTrue("NMs fail to connect to the RM", - cluster.waitForNodeManagersToConnect(10000)); - - // Simulate heartbeat using NodeStatus fixture - NodeHeartbeatRequest request = - NodeHeartbeatRequest.newInstance(nodeStatus, null, null, null); - ResourceTracker tracker = - ServerRMProxy.createRMProxy(conf, ResourceTracker.class); - tracker.nodeHeartbeat(request); - - // Give the heartbeat time to propagate to the RM - verifySimulatedUtilization(); - - // Alter utilization - int responseId = 10; - nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId, - CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2, - NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2); - nm.setNodeStatus(nodeStatus); - tracker.nodeHeartbeat(request); - - // Give the heartbeat time to propagate to the RM - verifySimulatedUtilization(); - } - - /** * Trigger the NM to send a heartbeat using the simulated NodeStatus fixture. * Verify both the RMNode and SchedulerNode have been updated with the new * utilization. */ @Test(timeout=60000) - public void testMockNodeStatusHeartbeat() + public void testUpdateNodeUtilization() throws InterruptedException, YarnException { assertTrue("NMs fail to connect to the RM", cluster.waitForNodeManagersToConnect(10000)); @@ -145,8 +107,7 @@ public void testMockNodeStatusHeartbeat() verifySimulatedUtilization(); // Alter utilization - int responseId = 20; - nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId, + nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0, CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2, NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2); nm.setNodeStatus(nodeStatus); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java index ba14491..0037983 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java @@ -28,13 +28,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.kerby.util.IOUtil; import org.junit.Test; @@ -84,21 +83,20 @@ public void testNMUpdation() throws Exception { private void validateRMNMKeyExchange(YarnConfiguration conf) throws Exception { // Default rolling and activation intervals are large enough, no need to // intervene - final DrainDispatcher dispatcher = new DrainDispatcher(); - ResourceManager rm = new ResourceManager() { - + MockRM rm = new MockRM() { @Override - protected void doSecureLogin() throws IOException { - // Do nothing. + protected ResourceTrackerService createResourceTrackerService() { + // Override MockRM's implementation to not roll both master keys on + // start + return new ResourceTrackerService(getRMContext(), nodesListManager, + this.nmLivelinessMonitor, + getRMContext().getContainerTokenSecretManager(), + getRMContext().getNMTokenSecretManager()); } @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - @Override - protected void startWepApp() { - // Don't need it, skip. + protected void doSecureLogin() throws IOException { + // Do nothing. } }; rm.init(conf); @@ -107,8 +105,8 @@ protected void startWepApp() { // Testing ContainerToken and NMToken String containerToken = "Container Token : "; String nmToken = "NM Token : "; - - MockNM nm = new MockNM("host:1234", 3072, rm.getResourceTrackerService()); + + MockNM nm = new MockNM("host:1234", 3072, rm); RegisterNodeManagerResponse registrationResponse = nm.registerNode(); MasterKey containerTokenMasterKey = @@ -118,8 +116,6 @@ protected void startWepApp() { MasterKey nmTokenMasterKey = registrationResponse.getNMTokenMasterKey(); Assert.assertNotNull(nmToken + "Registration should cause a key-update!", nmTokenMasterKey); - - dispatcher.await(); NodeHeartbeatResponse response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -128,7 +124,6 @@ protected void startWepApp() { Assert.assertNull(nmToken + "First heartbeat after registration shouldn't get any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -137,8 +132,6 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Even second heartbeat after registration shouldn't get any key updates!", response.getContainerTokenMasterKey()); - - dispatcher.await(); // Let's force a roll-over rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); @@ -161,7 +154,6 @@ protected void startWepApp() { "Roll-over should have incremented the key-id only by one!", nmTokenMasterKey.getKeyId() + 1, response.getNMTokenMasterKey().getKeyId()); - dispatcher.await(); response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -170,7 +162,6 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Second heartbeat after roll-over shouldn't get any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); // Let's force activation rm.getRMContext().getContainerTokenSecretManager().activateNextMasterKey(); @@ -183,7 +174,6 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Activation shouldn't cause any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -192,7 +182,6 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Even second heartbeat after activation shouldn't get any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); rm.stop(); }