diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index e27d8ca..a0110b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -27,12 +27,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.DecreasedContainer; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.DecreasedContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.DecreasedContainerProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; @@ -58,7 +61,9 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - + + private List containersToDecrease = null; + public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -96,6 +101,9 @@ private void mergeLocalToBuilder() { if (this.systemCredentials != null) { addSystemCredentialsToProto(); } + if (this.containersToDecrease != null) { + addContainersToDecreaseToProto(); + } } private void addSystemCredentialsToProto() { @@ -408,6 +416,63 @@ public void remove() { builder.addAllApplicationsToCleanup(iterable); } + private void initContainersToDecrease() { + if (this.containersToDecrease != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToDecreaseList(); + this.containersToDecrease = new ArrayList(); + + for (DecreasedContainerProto c : list) { + this.containersToDecrease.add(convertFromProtoFormat(c)); + } + } + + @Override + public List getContainersToDecrease() { + initContainersToDecrease(); + return this.containersToDecrease; + } + + @Override + public void addAllContainersToDecrease( + final List containersToDecrease) { + if (containersToDecrease == null) { + return; + } + this.containersToDecrease.addAll(containersToDecrease); + } + + private void addContainersToDecreaseToProto() { + maybeInitBuilder(); + builder.clearContainersToDecrease(); + if (this.containersToDecrease == null) { + return ; + } + Iterable iterable = new + Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToDecrease.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + @Override + public DecreasedContainerProto next() { + return convertToProtoFormat(iter.next()); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToDecrease(iterable); + } @Override public Map getSystemCredentialsForApps() { @@ -484,6 +549,14 @@ private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl) t).getProto(); } + private DecreasedContainerPBImpl convertFromProtoFormat(DecreasedContainerProto p) { + return new DecreasedContainerPBImpl(p); + } + + private DecreasedContainerProto convertToProtoFormat(DecreasedContainer t) { + return ((DecreasedContainerPBImpl) t).getProto(); + } + @Override public boolean getAreNodeLabelsAcceptedByRM() { NodeHeartbeatResponseProtoOrBuilder p = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 1498a0c..49754c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.DecreasedContainer; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -70,4 +71,7 @@ void setSystemCredentialsForApps( boolean getAreNodeLabelsAcceptedByRM(); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); + + List getContainersToDecrease(); + void addAllContainersToDecrease(List containersToDecrease); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 65376dc..92986a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -25,12 +25,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.IncreasedContainer; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.IncreasedContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.IncreasedContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; @@ -48,7 +51,8 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; - + private List increasedContainers = null; + public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); } @@ -78,6 +82,9 @@ private synchronized void mergeLocalToBuilder() { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.increasedContainers != null) { + addIncreasedContainersToProto(); + } } private synchronized void mergeLocalToProto() { @@ -164,6 +171,37 @@ public void remove() { builder.addAllKeepAliveApplications(iterable); } + private synchronized void addIncreasedContainersToProto() { + maybeInitBuilder(); + builder.clearIncreasedContainers(); + if (increasedContainers == null) { + return; + } + Iterable iterable = new + Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = + increasedContainers.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + @Override + public IncreasedContainerProto next() { + return convertToProtoFormat(iter.next()); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllIncreasedContainers(iterable); + } + @Override public int hashCode() { return getProto().hashCode(); @@ -291,6 +329,28 @@ public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) { this.nodeHealthStatus = healthStatus; } + @Override + public synchronized List getIncreasedContainers() { + if (increasedContainers != null) { + return increasedContainers; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getIncreasedContainersList(); + this.increasedContainers = new ArrayList<>(); + for (IncreasedContainerProto c : list) { + this.increasedContainers.add(convertFromProtoFormat(c)); + } + return this.increasedContainers; + } + + @Override + public synchronized void setIncreasedContainers( + List increasedContainers) { + if (increasedContainers == null) + builder.clearIncreasedContainers(); + this.increasedContainers = increasedContainers; + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -323,4 +383,12 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { private ApplicationIdProto convertToProtoFormat(ApplicationId c) { return ((ApplicationIdPBImpl)c).getProto(); } + + private IncreasedContainerPBImpl convertFromProtoFormat(IncreasedContainerProto c) { + return new IncreasedContainerPBImpl(c); + } + + private IncreasedContainerProto convertToProtoFormat(IncreasedContainer c) { + return ((IncreasedContainerPBImpl)c).getProto(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index aad819d..186ce21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.IncreasedContainer; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; @@ -55,4 +56,8 @@ public abstract void setContainersStatuses( public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + public abstract List getIncreasedContainers(); + public abstract void setIncreasedContainers( + List increasedContainers); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 99149ac..bf50f56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -36,6 +36,7 @@ message NodeStatusProto { repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + repeated IncreasedContainerProto increased_containers = 6; } message MasterKeyProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c122b2a..d0b551a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto { repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; + repeated DecreasedContainerProto containers_to_decrease = 12; } message SystemCredentialsForAppsProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java new file mode 100644 index 0000000..9157328 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrDecreaseContainersResourceEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + + +import org.apache.hadoop.yarn.api.records.DecreasedContainer; +import java.util.List; + +public class CMgrDecreaseContainersResourceEvent extends ContainerManagerEvent { + + private final List containersToDecrease; + + public CMgrDecreaseContainersResourceEvent(List + containersToDecrease) { + super(ContainerManagerEventType.DECREASE_CONTAINERS_RESOURCE); + this.containersToDecrease = containersToDecrease; + } + + public List getContainersToDecrease() { + return this.containersToDecrease; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3721b0e..be639e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.DecreasedContainer; +import org.apache.hadoop.yarn.api.records.IncreasedContainer; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; @@ -782,6 +784,14 @@ public void run() { ((NMContext) context) .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } + + List containersToDecrease = + response.getContainersToDecrease(); + if (!containersToDecrease.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrDecreaseContainersResourceEvent(containersToDecrease) + ); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index bc48adf..4001fa6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1628,7 +1628,7 @@ public static ContainerStatus createContainerStatus(int id, ContainerStatus containerStatus = BuilderUtils.newContainerStatus(contaierId, containerState, "test_containerStatus: id=" + id + ", containerState: " - + containerState, 0); + + containerState, 0, Resource.newInstance(1024, 1)); return containerStatus; }