diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index c51570c..6cdf87f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -68,7 +68,7 @@ public void testResourceTrackerOnHA() throws Exception { failoverThread = createAndStartFailoverThread(); NodeStatus status = NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, - null, null, null); + null, null, null, null); NodeHeartbeatRequest request2 = NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 1498a0c..38fbc82 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -70,4 +71,7 @@ void setSystemCredentialsForApps( boolean getAreNodeLabelsAcceptedByRM(); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); + + List getContainersToDecrease(); + void addAllContainersToDecrease(List containersToDecrease); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index e27d8ca..12c5230 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -27,12 +27,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; @@ -58,7 +61,9 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - + + private List containersToDecrease = null; + public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -96,6 +101,9 @@ private void mergeLocalToBuilder() { if (this.systemCredentials != null) { addSystemCredentialsToProto(); } + if (this.containersToDecrease != null) { + addContainersToDecreaseToProto(); + } } private void addSystemCredentialsToProto() { @@ -408,6 +416,64 @@ public void remove() { builder.addAllApplicationsToCleanup(iterable); } + private void initContainersToDecrease() { + if (this.containersToDecrease != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToDecreaseList(); + this.containersToDecrease = new ArrayList<>(); + + for (ContainerProto c : list) { + this.containersToDecrease.add(convertFromProtoFormat(c)); + } + } + + @Override + public List getContainersToDecrease() { + initContainersToDecrease(); + return this.containersToDecrease; + } + + @Override + public void addAllContainersToDecrease( + final List containersToDecrease) { + if (containersToDecrease == null) { + return; + } + initContainersToDecrease(); + this.containersToDecrease.addAll(containersToDecrease); + } + + private void addContainersToDecreaseToProto() { + maybeInitBuilder(); + builder.clearContainersToDecrease(); + if (this.containersToDecrease == null) { + return; + } + Iterable iterable = new + Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iter = containersToDecrease.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + @Override + public ContainerProto next() { + return convertToProtoFormat(iter.next()); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToDecrease(iterable); + } @Override public Map getSystemCredentialsForApps() { @@ -484,6 +550,14 @@ private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl) t).getProto(); } + private ContainerPBImpl convertFromProtoFormat(ContainerProto p) { + return new ContainerPBImpl(p); + } + + private ContainerProto convertToProtoFormat(Container t) { + return ((ContainerPBImpl) t).getProto(); + } + @Override public boolean getAreNodeLabelsAcceptedByRM() { NodeHeartbeatResponseProtoOrBuilder p = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 38b0381..b8f7f15 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; @@ -47,13 +48,15 @@ * @param keepAliveApplications Applications to keep alive. * @param nodeHealthStatus Health status of the node. * @param containersUtilizations Utilization of the containers in this node. + * @param increasedContainers Containers whose resource has been increased. * @return New {@code NodeStatus} with the provided information. */ public static NodeStatus newInstance(NodeId nodeId, int responseId, List containerStatuses, List keepAliveApplications, NodeHealthStatus nodeHealthStatus, - ResourceUtilization containersUtilization) { + ResourceUtilization containersUtilization, + List increasedContainers) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); @@ -61,6 +64,7 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, nodeStatus.setKeepAliveApplications(keepAliveApplications); nodeStatus.setNodeHealthStatus(nodeHealthStatus); nodeStatus.setContainersUtilization(containersUtilization); + nodeStatus.setIncreasedContainers(increasedContainers); return nodeStatus; } @@ -92,4 +96,8 @@ public abstract void setContainersStatuses( @Unstable public abstract void setContainersUtilization( ResourceUtilization containersUtilization); + + public abstract List getIncreasedContainers(); + public abstract void setIncreasedContainers( + List increasedContainers); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index fffd6a9..ed33691 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -24,13 +24,16 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; @@ -49,7 +52,8 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; - + private List increasedContainers = null; + public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); } @@ -79,6 +83,9 @@ private synchronized void mergeLocalToBuilder() { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.increasedContainers != null) { + addIncreasedContainersToProto(); + } } private synchronized void mergeLocalToProto() { @@ -165,6 +172,37 @@ public void remove() { builder.addAllKeepAliveApplications(iterable); } + private synchronized void addIncreasedContainersToProto() { + maybeInitBuilder(); + builder.clearIncreasedContainers(); + if (increasedContainers == null) { + return; + } + Iterable iterable = new + Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iter = + increasedContainers.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + @Override + public ContainerProto next() { + return convertToProtoFormat(iter.next()); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllIncreasedContainers(iterable); + } + @Override public int hashCode() { return getProto().hashCode(); @@ -314,6 +352,31 @@ public void setContainersUtilization( .setContainersUtilization(convertToProtoFormat(containersUtilization)); } + @Override + public synchronized List getIncreasedContainers() { + if (increasedContainers != null) { + return increasedContainers; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getIncreasedContainersList(); + this.increasedContainers = new ArrayList<>(); + for (ContainerProto c : list) { + this.increasedContainers.add(convertFromProtoFormat(c)); + } + return this.increasedContainers; + } + + @Override + public synchronized void setIncreasedContainers( + List increasedContainers) { + maybeInitBuilder(); + if (increasedContainers == null) { + builder.clearIncreasedContainers(); + return; + } + this.increasedContainers = increasedContainers; + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -355,4 +418,14 @@ private ResourceUtilizationPBImpl convertFromProtoFormat( ResourceUtilizationProto p) { return new ResourceUtilizationPBImpl(p); } + + private ContainerPBImpl convertFromProtoFormat( + ContainerProto c) { + return new ContainerPBImpl(c); + } + + private ContainerProto convertToProtoFormat( + Container c) { + return ((ContainerPBImpl)c).getProto(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index a810813..5dfd488 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -37,6 +37,7 @@ message NodeStatusProto { optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; optional ResourceUtilizationProto containers_utilization = 6; + repeated ContainerProto increased_containers = 7; } message MasterKeyProto { @@ -59,4 +60,4 @@ message ResourceUtilizationProto { optional int32 pmem = 1; optional int32 vmem = 2; optional float cpu = 3; -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c122b2a..2db8919 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto { repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; + repeated ContainerProto containers_to_decrease = 12; } message SystemCredentialsForAppsProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index d9eeb9d..c9427dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -168,6 +169,20 @@ public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() { assertTrue(copy.getAreNodeLabelsAcceptedByRM()); } + @Test + public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() { + NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); + original.addAllContainersToDecrease( + Arrays.asList(getDecreasedContainer(1, 2, 2048, 2), + getDecreasedContainer(2, 3, 1024, 1))); + NodeHeartbeatResponsePBImpl copy = + new NodeHeartbeatResponsePBImpl(original.getProto()); + assertEquals(1, copy.getContainersToDecrease().get(0) + .getId().getContainerId()); + assertEquals(1024, copy.getContainersToDecrease().get(1) + .getResource().getMemory()); + } + /** * Test RegisterNodeManagerRequestPBImpl. */ @@ -244,6 +259,9 @@ public void testNodeStatusPBImpl() { original.setNodeHealthStatus(getNodeHealthStatus()); original.setNodeId(getNodeId()); original.setResponseId(1); + original.setIncreasedContainers( + Arrays.asList(getIncreasedContainer(1, 2, 2048, 2), + getIncreasedContainer(2, 3, 4096, 3))); NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto()); assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId() @@ -252,7 +270,10 @@ public void testNodeStatusPBImpl() { assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime()); assertEquals(9090, copy.getNodeId().getPort()); assertEquals(1, copy.getResponseId()); - + assertEquals(1, copy.getIncreasedContainers().get(0) + .getId().getContainerId()); + assertEquals(4096, copy.getIncreasedContainers().get(1) + .getResource().getMemory()); } @Test @@ -347,6 +368,22 @@ public ApplicationIdPBImpl setParameters(int id, long timestamp) { return new ApplicationIdPBImpl(appId.getProto()); } + private Container getDecreasedContainer(int containerID, + int appAttemptId, int memory, int vCores) { + ContainerId containerId = getContainerId(containerID, appAttemptId); + Resource capability = Resource.newInstance(memory, vCores); + return Container.newInstance( + containerId, null, null, capability, null, null); + } + + private Container getIncreasedContainer(int containerID, + int appAttemptId, int memory, int vCores) { + ContainerId containerId = getContainerId(containerID, appAttemptId); + Resource capability = Resource.newInstance(memory, vCores); + return Container.newInstance( + containerId, null, null, capability, null, null); + } + private NodeStatus getNodeStatus() { NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class); status.setContainersStatuses(new ArrayList()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 42a4234..25934d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -62,6 +62,9 @@ ConcurrentMap getContainers(); + ConcurrentMap + getIncreasedContainers(); + NMContainerTokenSecretManager getContainerTokenSecretManager(); NMTokenSecretManagerInNM getNMTokenSecretManager(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 185ba12..7f378f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -396,6 +396,10 @@ public void run() { protected final ConcurrentMap containers = new ConcurrentSkipListMap(); + protected final ConcurrentMap increasedContainers = + new ConcurrentHashMap<>(); + private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; private ContainerManagementProtocol containerManager; @@ -449,6 +453,12 @@ public int getHttpPort() { } @Override + public ConcurrentMap + getIncreasedContainers() { + return this.increasedContainers; + } + + @Override public NMContainerTokenSecretManager getContainerTokenSecretManager() { return this.containerTokenSecretManager; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 30a2bd5..def37fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -431,10 +431,12 @@ private NodeStatus getNodeStatus(int responseId) throws IOException { } List containersStatuses = getContainerStatuses(); ResourceUtilization containersUtilization = getContainersUtilization(); + List increasedContainers + = getIncreasedContainers(); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization); + containersUtilization, increasedContainers); return nodeStatus; } @@ -451,6 +453,21 @@ private ResourceUtilization getContainersUtilization() { return containersMonitor.getContainersUtilization(); } + /* Get the containers whose resource has been increased since last + * NM-RM heartbeat. + */ + private List + getIncreasedContainers() { + List + increasedContainers = new ArrayList<>( + this.context.getIncreasedContainers().values()); + for (org.apache.hadoop.yarn.api.records.Container + container : increasedContainers) { + this.context.getIncreasedContainers().remove(container.getId()); + } + return increasedContainers; + } + // Iterate through the NMContext and clone and get all the containers' // statuses. If it's a completed container, add into the // recentlyStoppedContainers collections. @@ -797,6 +814,14 @@ public void run() { ((NMContext) context) .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } + + List + containersToDecrease = response.getContainersToDecrease(); + if (!containersToDecrease.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrDecreaseContainersResourceEvent(containersToDecrease) + ); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index d7f35b3..0695f98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1047,6 +1047,13 @@ private synchronized void changeContainerResourceInternal( + " is not smaller than the current resource " + currentResource.toString()); } + if (increase) { + org.apache.hadoop.yarn.api.records.Container increasedContainer = + org.apache.hadoop.yarn.api.records.Container.newInstance( + containerId, null, null, targetResource, null, null); + context.getIncreasedContainers().put( + containerId, increasedContainer); + } this.readLock.lock(); try { if (!serviceStopped) {