diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8022a07e17c..2db5b4e9815 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -582,6 +583,11 @@ private ResourceUtilization getNodeUtilization() { private void updateNMResource(Resource resource) { metrics.addResource(Resources.subtract(resource, totalResource)); this.totalResource = resource; + + // Update the containers monitor + ContainersMonitor containersMonitor = + this.context.getContainerManager().getContainersMonitor(); + containersMonitor.setAllocatedResourcesForContainers(totalResource); } // Iterate through the NMContext and clone and get all the containers' diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index daecc288796..c982c03144f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -64,4 +64,10 @@ static void decreaseResourceUtilization( * containersMonitor.getVmemRatio()); resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores); } + + /** + * Set the allocated resources for containers. + * @param resource Resources allocated for the containers. + */ + void setAllocatedResourcesForContainers(final Resource resource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b46e6204b9d..a7d2b158dd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -155,23 +155,21 @@ protected void serviceInit(Configuration myConf) throws Exception { NodeManagerHardwareUtils.getContainerMemoryMB( this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L; - long configuredVCoresForContainers = + int configuredVCoresForContainers = NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, this.conf); - // Setting these irrespective of whether checks are enabled. Required in - // the UI. - // ///////// Physical memory configuration ////// - this.maxPmemAllottedForContainers = configuredPMemForContainers; - this.maxVCoresAllottedForContainers = configuredVCoresForContainers; - // ///////// Virtual memory configuration ////// vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); Preconditions.checkArgument(vmemRatio > 0.99f, YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0"); - this.maxVmemAllottedForContainers = - (long) (vmemRatio * configuredPMemForContainers); + + // Setting these irrespective of whether checks are enabled. + // Required in the UI. + Resource resourcesForContainers = Resource.newInstance( + configuredPMemForContainers, configuredVCoresForContainers); + setAllocatedResourcesForContainers(resourcesForContainers); pmemCheckEnabled = this.conf.getBoolean( YarnConfiguration.NM_PMEM_CHECK_ENABLED, @@ -908,6 +906,14 @@ public long getVCoresAllocatedForContainers() { return this.maxVCoresAllottedForContainers; } + @Override + public void setAllocatedResourcesForContainers(final Resource resource) { + this.maxVCoresAllottedForContainers = resource.getVirtualCores(); + this.maxPmemAllottedForContainers = resource.getMemorySize(); + this.maxVmemAllottedForContainers = + (long) (getVmemRatio() * maxPmemAllottedForContainers); + } + /** * Is the total virtual memory check enabled? * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 5ba0bef763c..9974d4f39e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; import java.io.EOFException; import java.io.File; @@ -57,6 +59,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; @@ -78,9 +81,14 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl; import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -90,17 +98,20 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -1774,6 +1785,97 @@ public void run() { Assert.assertTrue("Test failed with exception(s)" + exceptions, exceptions.isEmpty()); } + + @Test + public void testUpdateNMResources() throws Exception { + + // The resource set for the Node Manager from the Resource Tracker + final Resource resource = Resource.newInstance( + 8L * 1024 * 1024 * 1024, 1); + + // Start the mock Resource Tracker server + Server rt = getMockResourceTracker(resource); + rt.start(); + + // Start the NodeManager + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + "0.0.0.0:" + rt.getPort()); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0"); + nm.init(conf); + nm.start(); + + // Check for the initial size of the Node Manager + ContainerManager containerManager = nm.getContainerManager(); + nm.getNMContext().getContainerTokenSecretManager(); + ContainersMonitor containerMonitor = + containerManager.getContainersMonitor(); + assertEquals(8, containerMonitor.getVCoresAllocatedForContainers()); + assertEquals(8L * 1024 * 1024 * 1024, + containerMonitor.getPmemAllocatedForContainers()); + + // Wait for the first heartbeat to propagate the initial resource change + GenericTestUtils.waitFor( + () -> containerMonitor.getVCoresAllocatedForContainers() == 1, + 100, 2 * 1000); + assertEquals(8L * 1024 * 1024 * 1024, + containerMonitor.getPmemAllocatedForContainers()); + + // Change the value from the Resource Tracker + resource.setVirtualCores(5); + GenericTestUtils.waitFor( + () -> containerMonitor.getVCoresAllocatedForContainers() == 5, + 100, 2 * 1000); + assertEquals(8L * 1024 * 1024 * 1024, + containerMonitor.getPmemAllocatedForContainers()); + + // Cleanup + nm.stop(); + nm.close(); + rt.stop(); + } + + /** + * Create a mock Resource Tracker server that returns the resources we want + * in the heartbeat. + * @param resource Resource to reply in the heartbeat. + * @return RPC server for the Resource Tracker. + * @throws Exception If it cannot create the Resource Tracker. + */ + private static Server getMockResourceTracker(final Resource resource) + throws Exception { + + // Setup the mock Resource Tracker + ResourceTracker rt = mock(ResourceTracker.class); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + when(rt.registerNodeManager(any())).thenAnswer(invocation -> { + RegisterNodeManagerResponse response = recordFactory.newRecordInstance( + RegisterNodeManagerResponse.class); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + return response; + }); + when(rt.nodeHeartbeat(any())).thenAnswer(invocation -> { + NodeHeartbeatResponse response = recordFactory.newRecordInstance( + NodeHeartbeatResponse.class); + response.setResource(resource); + return response; + }); + when(rt.unRegisterNodeManager(any())).thenAnswer(invocaiton -> { + UnRegisterNodeManagerResponse response = recordFactory.newRecordInstance( + UnRegisterNodeManagerResponse.class); + return response; + }); + + // Get the RPC server + YarnConfiguration conf = new YarnConfiguration(); + YarnRPC rpc = YarnRPC.create(conf); + Server server = rpc.getServer(ResourceTracker.class, rt, + new InetSocketAddress("0.0.0.0", 0), conf, null, 1); + return server; + } + // Add new containers info into NM context each time node heart beats. private class MyNMContext extends NMContext {