diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index aad819d..bde5c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -20,23 +20,25 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; - public abstract class NodeStatus { public static NodeStatus newInstance(NodeId nodeId, int responseId, List containerStatuses, List keepAliveApplications, - NodeHealthStatus nodeHealthStatus) { + NodeHealthStatus nodeHealthStatus, + List decreasedContainers) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); nodeStatus.setContainersStatuses(containerStatuses); nodeStatus.setKeepAliveApplications(keepAliveApplications); nodeStatus.setNodeHealthStatus(nodeHealthStatus); + nodeStatus.setDecreasedContainers(decreasedContainers); return nodeStatus; } @@ -55,4 +57,9 @@ public abstract void setContainersStatuses( public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + public abstract void setDecreasedContainers( + List decreasedContainers); + + public abstract List getDecreasedContainers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 65376dc..c10ce74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -24,12 +24,15 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; @@ -48,6 +51,7 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; + private List newDecreasedContainers = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -78,6 +82,9 @@ private synchronized void mergeLocalToBuilder() { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.newDecreasedContainers != null) { + addNewDecreasedContainersToProto(); + } } private synchronized void mergeLocalToProto() { @@ -163,6 +170,43 @@ public void remove() { }; builder.addAllKeepAliveApplications(iterable); } + + private synchronized void addNewDecreasedContainersToProto() { + maybeInitBuilder(); + builder.clearNewDecreasedContainers(); + if (newDecreasedContainers == null) + return; + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = newDecreasedContainers + .iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerResourceDecreaseProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllNewDecreasedContainers(iterable); + } + @Override public int hashCode() { @@ -291,6 +335,36 @@ public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) { this.nodeHealthStatus = healthStatus; } + @Override + public synchronized void setDecreasedContainers( + List decreasedContainers) { + if (decreasedContainers == null) { + builder.clearNewDecreasedContainers(); + } + this.newDecreasedContainers = decreasedContainers; + } + + @Override + public synchronized List getDecreasedContainers() { + initDecreasedContainers(); + return this.newDecreasedContainers; + } + + private synchronized void initDecreasedContainers() { + if (this.newDecreasedContainers != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getNewDecreasedContainersList(); + this.newDecreasedContainers = new ArrayList(); + + for (ContainerResourceDecreaseProto c : list) { + this.newDecreasedContainers.add(convertFromProtoFormat(c)); + } + + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -316,6 +390,16 @@ private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { return ((ContainerStatusPBImpl)c).getProto(); } + private ContainerResourceDecreaseProto convertToProtoFormat( + ContainerResourceDecrease t) { + return ((ContainerResourceDecreasePBImpl) t).getProto(); + } + + private ContainerResourceDecrease convertFromProtoFormat( + ContainerResourceDecreaseProto p) { + return new ContainerResourceDecreasePBImpl(p); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { return new ApplicationIdPBImpl(c); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 4f5d168..ed93639 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -36,6 +36,7 @@ message NodeStatusProto { repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + repeated ContainerResourceDecreaseProto new_decreased_containers = 6; } message MasterKeyProto { @@ -47,4 +48,5 @@ message NodeHealthStatusProto { optional bool is_node_healthy = 1; optional string health_report = 2; optional int64 last_health_report_time = 3; -} \ No newline at end of file +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java new file mode 100644 index 0000000..06202f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; +import org.junit.Test; + +public class TestNodeStatus { + @Test + public void testNodeStatusWithDecreasedContainers() { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); + + // add 3 instances + List list = + new ArrayList(); + list.add(ContainerResourceDecrease.newInstance(null, null)); + list.add(ContainerResourceDecrease.newInstance(null, null)); + list.add(ContainerResourceDecrease.newInstance(null, null)); + nodeStatus.setDecreasedContainers(list); + + // serde + NodeStatusProto proto = ((NodeStatusPBImpl) nodeStatus).getProto(); + nodeStatus = new NodeStatusPBImpl(proto); + + // check value + Assert.assertEquals(list.size(), nodeStatus.getDecreasedContainers() + .size()); + } + + @Test + public void testNodeStatusWithoutDecreasedContainers() { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); + + // serde + NodeStatusProto proto = ((NodeStatusPBImpl) nodeStatus).getProto(); + nodeStatus = new NodeStatusPBImpl(proto); + + // check value + Assert.assertEquals(0, nodeStatus.getDecreasedContainers().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 729e043..b690d53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -66,4 +68,6 @@ LocalDirsHandlerService getLocalDirsHandler(); ApplicationACLsManager getApplicationACLsManager(); + + BlockingQueue getDecreasedContainers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a169c12..7444a2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -242,6 +245,8 @@ public void run() { new ConcurrentHashMap(); private final ConcurrentMap containers = new ConcurrentSkipListMap(); + private final BlockingQueue decreasedContainers = + new LinkedBlockingQueue(); private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -328,6 +333,11 @@ public LocalDirsHandlerService getLocalDirsHandler() { public ApplicationACLsManager getApplicationACLsManager() { return aclsManager; } + + @Override + public BlockingQueue getDecreasedContainers() { + return decreasedContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index aaf6ceb..34689c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -338,10 +339,21 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext( LOG.debug(this.nodeId + " sending out status for " + containersStatuses.size() + " containers"); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, - containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus); + containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus, + pullDecreasedContainers()); return nodeStatus; } + + private List pullDecreasedContainers() { + List decreasedContainers = + new ArrayList(); + ContainerResourceDecrease item = null; + while (null != (item = context.getDecreasedContainers().poll())) { + decreasedContainers.add(item); + } + return decreasedContainers; + } /* * It will return current container statuses. If any container has