diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index dd1060c..0736b10 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -54,6 +54,8 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -71,6 +73,7 @@ import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; @@ -438,5 +441,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request) "Dummy function cause")); throw new IOException(e); } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 6f21c87..1f21568 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -45,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -443,6 +445,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws IOException { return null; } + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } @SuppressWarnings("serial") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 7aa43df..8d84789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -170,4 +172,16 @@ StopContainersResponse stopContainers(StopContainersRequest request) GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException, IOException; + + /** + * The API used by the ApplicationMaster to request change + * running containers in this node + * + * @throws YarnException + * @throws IOException + */ + @Public + ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java new file mode 100644 index 0000000..ab37f80 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by Application Master to ask + * Node Manager change resource quota used by a specified container + *

+ * + * @see ContainerManagementProtocol#changeContainersResource(ChangeContainersResourceRequest) + */ +@Public +@Stable +public abstract class ChangeContainersResourceRequest { + @Public + @Stable + public static ChangeContainersResourceRequest newInstance( + List containersTokensToIncrease, + List containersToDecrease) { + ChangeContainersResourceRequest request = + Records.newRecord(ChangeContainersResourceRequest.class); + request.setContainersToIncrease(containersTokensToIncrease); + request.setContainersToDecrease(containersToDecrease); + return request; + } + + /** + * get tokens of container need to be increased. + */ + @Public + @Stable + public abstract List getContainersToIncrease(); + + /** + * set tokens of containers need to be increased, token is acquired in + * AllocateResponse.getIncreasedContainers. Here we only need + * the token in ContainerResourceIncrease because container id + * and capability are already contained in token + */ + @Public + @Stable + public abstract void setContainersToIncrease( + List containersTokensToIncrease); + + /** + * get decrease containers sent by ApplicationMaster + */ + @Public + @Stable + public abstract List getContainersToDecrease(); + + /** + * set containers need to be decreased by ApplicationMaster to + * NodeManager + */ + @Public + @Stable + public abstract void setContainersToDecrease( + List containersToDecrease); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java new file mode 100644 index 0000000..9bd7708 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Stable +public abstract class ChangeContainersResourceResponse { + @Public + @Stable + public static ChangeContainersResourceResponse newInstance( + List succeedChangedContainers, + List failedChangedContainers) { + ChangeContainersResourceResponse request = + Records.newRecord(ChangeContainersResourceResponse.class); + request.setSucceedChangedContainers(succeedChangedContainers); + request.setFailedChangedContainers(failedChangedContainers); + return request; + } + + @Public + @Stable + public abstract List getSucceedChangedContainers(); + + @Public + @Stable + public abstract void setSucceedChangedContainers( + List succeedIncreasedContainers); + + @Public + @Stable + public abstract List getFailedChangedContainers(); + + @Public + @Stable + public abstract void setFailedChangedContainers( + List succeedIncreasedContainers); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 7b1647b..dd7874a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -34,4 +34,5 @@ service ContainerManagementProtocolService { rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); + rpc changeContainersResource(ChangeContainersResourceRequestProto) returns (ChangeContainersResourceResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 332be81..e8f902d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -229,3 +229,13 @@ message GetContainerStatusesResponseProto { repeated ContainerStatusProto status = 1; repeated ContainerExceptionMapProto failed_requests = 2; } + +message ChangeContainersResourceRequestProto { + repeated hadoop.common.TokenProto increase_containers = 1; + repeated ContainerResourceDecreaseProto decrease_containers = 2; +} + +message ChangeContainersResourceResponseProto { + repeated ContainerIdProto succeed_changed_containers = 1; + repeated ContainerIdProto failed_changed_containers = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 15397e3..63e3612 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -30,12 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -45,6 +49,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -128,4 +133,17 @@ public GetContainerStatusesResponse getContainerStatuses( return null; } } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + ChangeContainersResourceRequestProto requestProto = ((ChangeContainersResourceRequestPBImpl)request).getProto(); + try { + return new ChangeContainersResourceResponsePBImpl(proxy.changeContainersResource(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 2d33e69..55a9d33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -23,9 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; @@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses( throw new ServiceException(e); } } + + @Override + public ChangeContainersResourceResponseProto changeContainersResource( + RpcController controller, ChangeContainersResourceRequestProto proto) + throws ServiceException { + ChangeContainersResourceRequestPBImpl request = new ChangeContainersResourceRequestPBImpl(proto); + try { + ChangeContainersResourceResponse response = real.changeContainersResource(request); + return ((ChangeContainersResourceResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java new file mode 100644 index 0000000..45b5f89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class ChangeContainersResourceRequestPBImpl extends + ChangeContainersResourceRequest { + ChangeContainersResourceRequestProto proto = + ChangeContainersResourceRequestProto.getDefaultInstance(); + ChangeContainersResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private List containersToIncrease = null; + private List containersToDecrease = null; + + public ChangeContainersResourceRequestPBImpl() { + builder = ChangeContainersResourceRequestProto.newBuilder(); + } + + public ChangeContainersResourceRequestPBImpl( + ChangeContainersResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ChangeContainersResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.containersToIncrease != null) { + addIncreaseContainersToProto(); + } + if (this.containersToDecrease != null) { + addDecreaseContainersToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ChangeContainersResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getContainersToIncrease() { + initContainersToIncrease(); + return this.containersToIncrease; + } + + @Override + public void setContainersToIncrease(List containersToIncrease) { + if (containersToIncrease == null) { + return; + } + initContainersToIncrease(); + this.containersToIncrease.clear(); + this.containersToIncrease.addAll(containersToIncrease); + } + + @Override + public List getContainersToDecrease() { + initContainersToDecrease(); + return this.containersToDecrease; + } + + @Override + public void setContainersToDecrease( + List containersToDecrease) { + if (containersToDecrease == null) { + return; + } + initContainersToDecrease(); + this.containersToDecrease.clear(); + this.containersToDecrease.addAll(containersToDecrease); + } + + private void initContainersToIncrease() { + if (this.containersToIncrease != null) { + return; + } + ChangeContainersResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getIncreaseContainersList(); + this.containersToIncrease = new ArrayList(); + + for (TokenProto c : list) { + this.containersToIncrease.add(convertFromProtoFormat(c)); + } + } + + private void addIncreaseContainersToProto() { + maybeInitBuilder(); + builder.clearIncreaseContainers(); + if (this.containersToIncrease == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToIncrease.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public TokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllIncreaseContainers(iterable); + } + + private void initContainersToDecrease() { + if (this.containersToDecrease != null) { + return; + } + ChangeContainersResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getDecreaseContainersList(); + this.containersToDecrease = new ArrayList(); + + for (ContainerResourceDecreaseProto c : list) { + this.containersToDecrease.add(convertFromProtoFormat(c)); + } + } + + private void addDecreaseContainersToProto() { + maybeInitBuilder(); + builder.clearDecreaseContainers(); + if (this.containersToDecrease == null) { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToDecrease + .iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerResourceDecreaseProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllDecreaseContainers(iterable); + } + + private Token convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private ContainerResourceDecrease convertFromProtoFormat( + ContainerResourceDecreaseProto p) { + return new ContainerResourceDecreasePBImpl(p); + } + + private ContainerResourceDecreaseProto convertToProtoFormat( + ContainerResourceDecrease t) { + return ((ContainerResourceDecreasePBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java new file mode 100644 index 0000000..12f89c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class ChangeContainersResourceResponsePBImpl extends + ChangeContainersResourceResponse { + ChangeContainersResourceResponseProto proto = + ChangeContainersResourceResponseProto.getDefaultInstance(); + ChangeContainersResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + private List succeedChangedContainers = null; + private List failedChangedContainers = null; + + public ChangeContainersResourceResponsePBImpl() { + builder = ChangeContainersResourceResponseProto.newBuilder(); + } + + public ChangeContainersResourceResponsePBImpl( + ChangeContainersResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ChangeContainersResourceResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.succeedChangedContainers != null) { + addSucceedChangedContainersToProto(); + } + if (this.failedChangedContainers != null) { + addFailedChangedContainersToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ChangeContainersResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getSucceedChangedContainers() { + initSucceedChangedContainers(); + return this.succeedChangedContainers; + } + + @Override + public void setSucceedChangedContainers(List containers) { + if (containers == null) { + return; + } + initSucceedChangedContainers(); + this.succeedChangedContainers.clear(); + this.succeedChangedContainers.addAll(containers); + } + + private void initSucceedChangedContainers() { + if (this.succeedChangedContainers != null) { + return; + } + ChangeContainersResourceResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getSucceedChangedContainersList(); + this.succeedChangedContainers = new ArrayList(); + + for (ContainerIdProto c : list) { + this.succeedChangedContainers.add(convertFromProtoFormat(c)); + } + } + + private void addSucceedChangedContainersToProto() { + maybeInitBuilder(); + builder.clearSucceedChangedContainers(); + if (this.succeedChangedContainers == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = succeedChangedContainers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllSucceedChangedContainers(iterable); + } + + @Override + public List getFailedChangedContainers() { + initFailedChangedContainers(); + return this.failedChangedContainers; + } + + @Override + public void setFailedChangedContainers(List containers) { + if (containers == null) { + return; + } + initFailedChangedContainers(); + this.failedChangedContainers.clear(); + this.failedChangedContainers.addAll(containers); + } + + private void initFailedChangedContainers() { + if (this.failedChangedContainers != null) { + return; + } + ChangeContainersResourceResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getFailedChangedContainersList(); + this.failedChangedContainers = new ArrayList(); + + for (ContainerIdProto c : list) { + this.failedChangedContainers.add(convertFromProtoFormat(c)); + } + } + + private void addFailedChangedContainersToProto() { + maybeInitBuilder(); + builder.clearFailedChangedContainers(); + if (this.failedChangedContainers == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = failedChangedContainers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllFailedChangedContainers(iterable); + } + + private ContainerId convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 8fe5c3c..6771d88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -33,6 +33,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -165,5 +167,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesResponse.newInstance(list, null); return null; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 76384d3..8ae6a9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -218,6 +220,13 @@ public StopContainersResponse stopContainers(StopContainersRequest request) new Exception(EXCEPTION_CAUSE)); throw new YarnException(e); } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java new file mode 100644 index 0000000..61102ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.junit.Test; + +public class TestResourceChangeRPC { + + static final Log LOG = LogFactory.getLog(TestContainerLaunchRPC.class); + + @Test + public void testResourceChangeRPC() throws Exception { + testResourceChange(HadoopYarnProtoRPC.class.getName()); + } + + private void testResourceChange(String rpcClass) throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = + rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + try { + + ContainerManagementProtocol proxy = + (ContainerManagementProtocol) rpc.getProxy( + ContainerManagementProtocol.class, server.getListenerAddress(), + conf); + + List inc = new ArrayList(); + List dec = + new ArrayList(); + + for (int i = 0; i < 3; i++) { + inc.add(Token.newInstance("id".getBytes(), "pb", "passwd".getBytes(), + "service")); + } + for (int i = 0; i < 5; i++) { + dec.add(ContainerResourceDecrease.newInstance(null, null)); + } + + ChangeContainersResourceRequest req = + ChangeContainersResourceRequest.newInstance(inc, dec); + + try { + ChangeContainersResourceResponse res = + proxy.changeContainersResource(req); + Assert.assertEquals(inc.size(), res.getSucceedChangedContainers() + .size()); + Assert + .assertEquals(dec.size(), res.getFailedChangedContainers().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } finally { + server.stop(); + } + } + + public class DummyContainerManager implements ContainerManagementProtocol { + + @Override + public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException, IOException { + return null; + } + + @Override + public StopContainersResponse + stopContainers(StopContainersRequest requests) throws YarnException, + IOException { + return null; + } + + @Override + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws YarnException, IOException { + return null; + } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + ContainerId cid = ContainerId.newInstance(null, 0); + List inc = new ArrayList(); + List dec = new ArrayList(); + for (int i = 0; i < request.getContainersToIncrease().size(); i++) { + inc.add(cid); + } + for (int i = 0; i < request.getContainersToDecrease().size(); i++) { + dec.add(cid); + } + + ChangeContainersResourceResponse res = + ChangeContainersResourceResponse.newInstance(inc, dec); + return res; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java new file mode 100644 index 0000000..3087c14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.junit.Test; + +public class TestChangeContainersResourceRequest { + int id = 0; + + private ContainerResourceDecrease getNextResourceChangeContext() { + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234, 3), 3), id++); + Resource resource = Resource.newInstance(1023, 3); + ContainerResourceDecrease rcContext = + ContainerResourceDecrease.newInstance(containerId, resource); + return rcContext; + } + + @Test + public void testChangeContainersResourceRequest() { + List containerToIncrease = new ArrayList(); + List containerToDecrease = + new ArrayList(); + + for (int i = 0; i < 10; i++) { + Token ctx = + Token.newInstance("identifier".getBytes(), "simple", + "passwd".getBytes(), "service"); + containerToIncrease.add(ctx); + } + for (int i = 0; i < 5; i++) { + ContainerResourceDecrease ctx = getNextResourceChangeContext(); + containerToDecrease.add(ctx); + } + + ChangeContainersResourceRequest request = + ChangeContainersResourceRequest.newInstance(containerToIncrease, + containerToDecrease); + + // serde + ChangeContainersResourceRequestProto proto = + ((ChangeContainersResourceRequestPBImpl) request).getProto(); + request = new ChangeContainersResourceRequestPBImpl(proto); + + // check value + Assert.assertEquals(request.getContainersToIncrease().size(), + containerToIncrease.size()); + Assert.assertEquals(request.getContainersToDecrease().size(), + containerToDecrease.size()); + for (int i = 0; i < containerToDecrease.size(); i++) { + Assert.assertEquals(request.getContainersToDecrease().get(i), + containerToDecrease.get(i)); + } + } + + @Test + public void testChangeContainersResourceRequestWithNull() { + ChangeContainersResourceRequest request = + ChangeContainersResourceRequest.newInstance(null, null); + + // serde + ChangeContainersResourceRequestProto proto = + ((ChangeContainersResourceRequestPBImpl) request).getProto(); + request = new ChangeContainersResourceRequestPBImpl(proto); + + // check value + Assert.assertEquals(0, request.getContainersToIncrease().size()); + Assert.assertEquals(0, request.getContainersToDecrease().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java new file mode 100644 index 0000000..2af49c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; +import org.junit.Test; + +public class TestChangeContainersResourceResponse { + int id = 0; + + private ContainerId getNextContainerId() { + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234, 3), 3), id++); + return containerId; + } + + @Test + public void testChangeContainersResourceRequest() { + List succeedChanged = new ArrayList(); + List failChanged = new ArrayList(); + + for (int i = 0; i < 10; i++) { + succeedChanged.add(getNextContainerId()); + } + + for (int i = 0; i < 8; i++) { + failChanged.add(getNextContainerId()); + } + + ChangeContainersResourceResponse response = + ChangeContainersResourceResponse.newInstance(succeedChanged, + failChanged); + + // serde + ChangeContainersResourceResponseProto proto = + ((ChangeContainersResourceResponsePBImpl) response).getProto(); + response = new ChangeContainersResourceResponsePBImpl(proto); + + // check value + Assert.assertEquals(response.getSucceedChangedContainers().size(), + succeedChanged.size()); + Assert.assertEquals(response.getFailedChangedContainers().size(), + failChanged.size()); + for (int i = 0; i < succeedChanged.size(); i++) { + Assert.assertEquals(response.getSucceedChangedContainers().get(i), + succeedChanged.get(i)); + } + for (int i = 0; i < failChanged.size(); i++) { + Assert.assertEquals(response.getFailedChangedContainers().get(i), + failChanged.get(i)); + } + } + + @Test + public void testChangeContainersResourceRequestWithNull() { + ChangeContainersResourceResponse request = + ChangeContainersResourceResponse.newInstance(null, null); + + // serde + ChangeContainersResourceResponseProto proto = + ((ChangeContainersResourceResponsePBImpl) request).getProto(); + request = new ChangeContainersResourceResponsePBImpl(proto); + + // check value + Assert.assertEquals(0, request.getSucceedChangedContainers().size()); + Assert.assertEquals(0, request.getFailedChangedContainers().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index aad819d..bde5c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -20,23 +20,25 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; - public abstract class NodeStatus { public static NodeStatus newInstance(NodeId nodeId, int responseId, List containerStatuses, List keepAliveApplications, - NodeHealthStatus nodeHealthStatus) { + NodeHealthStatus nodeHealthStatus, + List decreasedContainers) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); nodeStatus.setContainersStatuses(containerStatuses); nodeStatus.setKeepAliveApplications(keepAliveApplications); nodeStatus.setNodeHealthStatus(nodeHealthStatus); + nodeStatus.setDecreasedContainers(decreasedContainers); return nodeStatus; } @@ -55,4 +57,9 @@ public abstract void setContainersStatuses( public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + public abstract void setDecreasedContainers( + List decreasedContainers); + + public abstract List getDecreasedContainers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 65376dc..c10ce74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -24,12 +24,15 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; @@ -48,6 +51,7 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; + private List newDecreasedContainers = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -78,6 +82,9 @@ private synchronized void mergeLocalToBuilder() { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.newDecreasedContainers != null) { + addNewDecreasedContainersToProto(); + } } private synchronized void mergeLocalToProto() { @@ -163,6 +170,43 @@ public void remove() { }; builder.addAllKeepAliveApplications(iterable); } + + private synchronized void addNewDecreasedContainersToProto() { + maybeInitBuilder(); + builder.clearNewDecreasedContainers(); + if (newDecreasedContainers == null) + return; + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = newDecreasedContainers + .iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerResourceDecreaseProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllNewDecreasedContainers(iterable); + } + @Override public int hashCode() { @@ -291,6 +335,36 @@ public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) { this.nodeHealthStatus = healthStatus; } + @Override + public synchronized void setDecreasedContainers( + List decreasedContainers) { + if (decreasedContainers == null) { + builder.clearNewDecreasedContainers(); + } + this.newDecreasedContainers = decreasedContainers; + } + + @Override + public synchronized List getDecreasedContainers() { + initDecreasedContainers(); + return this.newDecreasedContainers; + } + + private synchronized void initDecreasedContainers() { + if (this.newDecreasedContainers != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getNewDecreasedContainersList(); + this.newDecreasedContainers = new ArrayList(); + + for (ContainerResourceDecreaseProto c : list) { + this.newDecreasedContainers.add(convertFromProtoFormat(c)); + } + + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -316,6 +390,16 @@ private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { return ((ContainerStatusPBImpl)c).getProto(); } + private ContainerResourceDecreaseProto convertToProtoFormat( + ContainerResourceDecrease t) { + return ((ContainerResourceDecreasePBImpl) t).getProto(); + } + + private ContainerResourceDecrease convertFromProtoFormat( + ContainerResourceDecreaseProto p) { + return new ContainerResourceDecreasePBImpl(p); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { return new ApplicationIdPBImpl(c); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 4f5d168..ed93639 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -36,6 +36,7 @@ message NodeStatusProto { repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + repeated ContainerResourceDecreaseProto new_decreased_containers = 6; } message MasterKeyProto { @@ -47,4 +48,5 @@ message NodeHealthStatusProto { optional bool is_node_healthy = 1; optional string health_report = 2; optional int64 last_health_report_time = 3; -} \ No newline at end of file +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java new file mode 100644 index 0000000..06202f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; +import org.junit.Test; + +public class TestNodeStatus { + @Test + public void testNodeStatusWithDecreasedContainers() { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); + + // add 3 instances + List list = + new ArrayList(); + list.add(ContainerResourceDecrease.newInstance(null, null)); + list.add(ContainerResourceDecrease.newInstance(null, null)); + list.add(ContainerResourceDecrease.newInstance(null, null)); + nodeStatus.setDecreasedContainers(list); + + // serde + NodeStatusProto proto = ((NodeStatusPBImpl) nodeStatus).getProto(); + nodeStatus = new NodeStatusPBImpl(proto); + + // check value + Assert.assertEquals(list.size(), nodeStatus.getDecreasedContainers() + .size()); + } + + @Test + public void testNodeStatusWithoutDecreasedContainers() { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); + + // serde + NodeStatusProto proto = ((NodeStatusPBImpl) nodeStatus).getProto(); + nodeStatus = new NodeStatusPBImpl(proto); + + // check value + Assert.assertEquals(0, nodeStatus.getDecreasedContainers().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 729e043..b690d53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -66,4 +68,6 @@ LocalDirsHandlerService getLocalDirsHandler(); ApplicationACLsManager getApplicationACLsManager(); + + BlockingQueue getDecreasedContainers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a169c12..7444a2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -242,6 +245,8 @@ public void run() { new ConcurrentHashMap(); private final ConcurrentMap containers = new ConcurrentSkipListMap(); + private final BlockingQueue decreasedContainers = + new LinkedBlockingQueue(); private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -328,6 +333,11 @@ public LocalDirsHandlerService getLocalDirsHandler() { public ApplicationACLsManager getApplicationACLsManager() { return aclsManager; } + + @Override + public BlockingQueue getDecreasedContainers() { + return decreasedContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index aaf6ceb..e392b94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Random; import org.apache.commons.logging.Log; @@ -40,6 +41,7 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -338,10 +340,25 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext( LOG.debug(this.nodeId + " sending out status for " + containersStatuses.size() + " containers"); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, - containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus); + containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus, + pullDecreasedContainers()); return nodeStatus; } + + private List pullDecreasedContainers() { + List decreasedContainers = + new ArrayList(); + ContainerResourceDecrease item = null; + try { + while (null != (item = context.getDecreasedContainers().remove())) { + decreasedContainers.add(item); + } + } catch (NoSuchElementException e) { + // do nothing + } + return decreasedContainers; + } /* * It will return current container statuses. If any container has diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3..52f7fb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -55,6 +55,8 @@ import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -65,9 +67,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -112,6 +116,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerChangeMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; @@ -907,4 +912,125 @@ public Context getContext() { public Map getAuxServiceMetaData() { return this.auxiliaryServices.getMetaData(); } + + @Private + @VisibleForTesting + protected void authorizeIncreaseRequest(NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier) throws YarnException { + ContainerId containerId = containerTokenIdentifier.getContainerID(); + boolean unauthorized = false; + StringBuilder messageBuilder = + new StringBuilder("Unauthorized request to change container. "); + if (!nmTokenIdentifier.getApplicationAttemptId().equals( + containerId.getApplicationAttemptId())) { + unauthorized = true; + messageBuilder.append("\nNMToken for application attempt : ") + .append(nmTokenIdentifier.getApplicationAttemptId()) + .append(" was used for starting container with container token") + .append(" issued for application attempt : ") + .append(containerId.getApplicationAttemptId()); + } + if (unauthorized) { + String msg = messageBuilder.toString(); + LOG.error(msg); + throw RPCUtil.getRemoteException(msg); + } + } + + private void internalChangeContainerResource(ContainerId containerId, + Resource resource, List succeedChangedContainers, + List failedChangedContainers, + ContainerResourceDecrease decrease) throws YarnException { + // check container's existence + if (context.getContainers().get(containerId) == null) { + LOG.error("failed to increase size of container because" + + " this container not existed in this node:" + + containerId.toString()); + failedChangedContainers.add(containerId); + return; + } + + // check container's state + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState currentState = + context.getContainers().get(containerId).getContainerState(); + if (currentState != org.apache.hadoop.yarn.server. + nodemanager.containermanager.container.ContainerState.RUNNING) { + LOG.warn("We can only increase size of container in" + + " RUNNING state, containerId=" + containerId.toString() + + " current state is:" + currentState.name()); + failedChangedContainers.add(containerId); + return; + } + + // create ChangeContainerMonitoringEvent + long pmemBytes = resource.getMemory() * 1024 * 1024L; + float pmemRatio = + getConfig().getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + ContainerChangeMonitoringEvent monitorEvent = + new ContainerChangeMonitoringEvent(containerId, vmemBytes, pmemBytes); + containersMonitor.handle(monitorEvent); + + // We can consider the change successful + succeedChangedContainers.add(containerId); + + // if it's a succeed decreased container, add it to context, this will be + // used by NodeStatusUpdater + if (null != decrease) { + if (!context.getDecreasedContainers().offer(decrease)) { + LOG.error("Failed to decreased container" + + " to queue in NMContext, this shouldn't happen."); + throw new YarnException("Failed to decreased container" + + " to queue in NMContext, this shouldn't happen."); + } + } + } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + // get NMToken identifier + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier nmIdentifier = selectNMTokenIdentifier(remoteUgi); + + List succeedChangedContainers = new ArrayList(); + List failedChangedContainers = new ArrayList(); + + // loop all decrease request, try to change their resource limits + for (ContainerResourceDecrease r : request.getContainersToDecrease()) { + if (r.getContainerId() != null && r.getCapability() != null) { + internalChangeContainerResource(r.getContainerId(), r.getCapability(), + succeedChangedContainers, failedChangedContainers, r); + } + } + + // loop all increase request to check + for (org.apache.hadoop.yarn.api.records.Token token : request + .getContainersToIncrease()) { + ContainerTokenIdentifier containerTokenIdentifier = + BuilderUtils.newContainerTokenIdentifier(token); + + ContainerId containerId = containerTokenIdentifier.getContainerID(); + Resource resource = containerTokenIdentifier.getResource(); + try { + authorizeIncreaseRequest(nmIdentifier, containerTokenIdentifier); + } catch (YarnException e) { + failedChangedContainers.add(containerTokenIdentifier.getContainerID()); + continue; + } + + // change container monitoring size, pass null in last parameter to + // indicate it's not a decrease container request + internalChangeContainerResource(containerId, resource, + succeedChangedContainers, failedChangedContainers, null); + } + + ChangeContainersResourceResponse response = + ChangeContainersResourceResponse.newInstance(succeedChangedContainers, + failedChangedContainers); + return response; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java new file mode 100644 index 0000000..cb7bdb2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ContainerChangeMonitoringEvent extends ContainersMonitorEvent { + private final long vmemLimit; + private final long pmemLimit; + + public ContainerChangeMonitoringEvent(ContainerId containerId, + long vmemLimit, long pmemLimit) { + super(containerId, ContainersMonitorEventType.CHANGE_MONITORING_CONTAINER); + this.vmemLimit = vmemLimit; + this.pmemLimit = pmemLimit; + } + + public long getVmemLimit() { + return this.vmemLimit; + } + + public long getPmemLimit() { + return this.pmemLimit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java index be99651..9d73745 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java @@ -20,5 +20,6 @@ public enum ContainersMonitorEventType { START_MONITORING_CONTAINER, - STOP_MONITORING_CONTAINER + STOP_MONITORING_CONTAINER, + CHANGE_MONITORING_CONTAINER } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b681b34..8ba371a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -55,6 +55,7 @@ final Map containersToBeAdded; Map trackingContainers = new HashMap(); + final List containersToBeChanged; final ContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; @@ -81,6 +82,7 @@ public ContainersMonitorImpl(ContainerExecutor exec, this.containersToBeAdded = new HashMap(); this.containersToBeRemoved = new ArrayList(); + this.containersToBeChanged = new ArrayList(); this.monitoringThread = new MonitoringThread(); } @@ -195,7 +197,7 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - private static class ProcessTreeInfo { + static class ProcessTreeInfo { private ContainerId containerId; private String pid; private ResourceCalculatorProcessTree pTree; @@ -306,6 +308,19 @@ boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree, return isProcessTreeOverLimit(containerId, currentMemUsage, curMemUsageOfAgedProcesses, limit); } + + static class ContainerResourceToBeChanged { + public ContainerResourceToBeChanged(ContainerId containerId, + long vmemLimit, long pmemLimit) { + this.containerId = containerId; + this.vmemLimit = vmemLimit; + this.pmemLimit = pmemLimit; + } + + ContainerId containerId; + long vmemLimit; + long pmemLimit; + } private class MonitoringThread extends Thread { public MonitoringThread() { @@ -348,6 +363,25 @@ public void run() { } containersToBeRemoved.clear(); } + + // handle containers to be changed + synchronized (containersToBeChanged) { + for (ContainerResourceToBeChanged c : containersToBeChanged) { + // if c is not in trackingContainers, it maybe removed (finished or + // killed) already, just skip + if (trackingContainers.get(c.containerId) == null) { + LOG.warn("note container not found in trackingContainers, " + + "it maybe already completed, please check, Container=" + + c.containerId.toString() + + " that was to be resized is no longer running"); + continue; + } + ProcessTreeInfo info = trackingContainers.get(c.containerId); + info.vmemLimit = c.vmemLimit; + info.pmemLimit = c.pmemLimit; + } + containersToBeChanged.clear(); + } // Now do the monitoring for the trackingContainers // Check memory usage and kill any overflowing containers @@ -547,6 +581,13 @@ public void handle(ContainersMonitorEvent monitoringEvent) { this.containersToBeRemoved.add(containerId); } break; + case CHANGE_MONITORING_CONTAINER: + ContainerChangeMonitoringEvent event = + (ContainerChangeMonitoringEvent) monitoringEvent; + synchronized (this.containersToBeChanged) { + this.containersToBeChanged.add(new ContainerResourceToBeChanged( + containerId, event.getVmemLimit(), event.getPmemLimit())); + } default: // TODO: Wrong event. } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index a47e7f7..ebfc603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -203,4 +203,10 @@ protected ContainerExecutor createContainerExecutor() { linuxContainerExecutor.setConf(super.conf); return linuxContainerExecutor; } + + @Override + public void testContainersChangeResource() { + LOG.info("not running this test."); + return; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index f62cd50..6ba2371 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -40,6 +40,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -791,6 +793,69 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception { Assert.assertTrue(response.getFailedRequests().get(cId).getMessage() .contains("The auxService:" + serviceName + " does not exist")); } + + @Test + public void testContainersChangeResource() throws Exception { + containerManager.start(); + + // first we will start containers 0..4 + List list = new ArrayList(); + ContainerLaunchContext containerLaunchContext = recordFactory + .newRecordInstance(ContainerLaunchContext.class); + for (int i = 0; i < 4; i++) { + ContainerId cId = createContainerId(i); + long identifier = DUMMY_RM_IDENTIFIER; + Token containerToken = createContainerToken(cId, identifier, + context.getNodeId(), user, context.getContainerTokenSecretManager()); + StartContainerRequest request = StartContainerRequest.newInstance( + containerLaunchContext, containerToken); + list.add(request); + } + StartContainersRequest requestList = StartContainersRequest + .newInstance(list); + + StartContainersResponse response = containerManager + .startContainers(requestList); + + Assert.assertEquals(4, response.getSuccessfullyStartedContainers().size()); + for (int i = 0; i < response.getSuccessfullyStartedContainers().size(); i++) { + Assert.assertEquals(i, response.getSuccessfullyStartedContainers().get(i) + .getId()); + } + + // we will create some change request, + List increaseTokens = new ArrayList(); + + // add change request for container-0, the resource not match because + // container's state in CM will not be RUNNING + ContainerId cId = createContainerId(0); + long identifier = DUMMY_RM_IDENTIFIER; + Token containerToken = + createContainerToken(cId, identifier, context.getNodeId(), user, + context.getContainerTokenSecretManager(), + Resource.newInstance(9999, 3)); + increaseTokens.add(containerToken); + + // add change request for container-7, the resource not match because + // container not existed + cId = createContainerId(7); + identifier = DUMMY_RM_IDENTIFIER; + containerToken = + createContainerToken(cId, identifier, context.getNodeId(), user, + context.getContainerTokenSecretManager(), + Resource.newInstance(9999, 3)); + increaseTokens.add(containerToken); + + ChangeContainersResourceRequest changeRequest = ChangeContainersResourceRequest + .newInstance(increaseTokens, null); + ChangeContainersResourceResponse changeResponse = containerManager + .changeContainersResource(changeRequest); + + // check response + Assert.assertEquals(2, changeResponse.getFailedChangedContainers().size()); + Assert.assertEquals(0, changeResponse.getFailedChangedContainers().get(0).getId()); + Assert.assertEquals(7, changeResponse.getFailedChangedContainers().get(1).getId()); + } public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, @@ -807,4 +872,19 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, containerTokenIdentifier); return containerToken; } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, + NMContainerTokenSecretManager containerTokenSecretManager, Resource resource) + throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, + System.currentTimeMillis() + 100000L, 123, rmIdentifier); + Token containerToken = + BuilderUtils + .newContainerToken(nodeId, containerTokenSecretManager + .retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + return containerToken; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java new file mode 100644 index 0000000..b560f40 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; + +public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { + @Override + public long getVirtualMemorySize() { + return 0; + } + + @Override + public long getPhysicalMemorySize() { + return 0; + } + + @Override + public long getAvailableVirtualMemorySize() { + return 0; + } + + @Override + public long getAvailablePhysicalMemorySize() { + return 0; + } + + @Override + public int getNumProcessors() { + return 0; + } + + @Override + public long getCpuFrequency() { + return 0; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public float getCpuUsage() { + return 0; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java new file mode 100644 index 0000000..ace74a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; + +public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree { + long pmem = 0; + long vmem = 0; + + public MockResourceCalculatorProcessTree(String root) { + super(root); + } + + @Override + public void updateProcessTree() { + } + + @Override + public String getProcessTreeDump() { + return ""; + } + + @Override + public long getCumulativeVmem(int olderThanAge) { + return olderThanAge > 0 ? 0 : vmem; + } + + @Override + public long getCumulativeRssmem(int olderThanAge) { + return olderThanAge > 0 ? 0 : pmem; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return true; + } + + public void setPmem(long pmem) { + this.pmem = pmem; + } + + public void setVmem(long vmem) { + this.vmem = vmem; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java new file mode 100644 index 0000000..8f5beeb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestContainersMonitorResourceChange { + ContainersMonitorImpl cm; + MockExecutor executor; + Configuration conf; + AsyncDispatcher dispatcher; + MockContainerEventHandler containerEventHandler; + + static class MockExecutor extends ContainerExecutor { + ConcurrentMap containerIdToPid = + new ConcurrentHashMap(); + + @Override + public void init() throws IOException { + } + + @Override + public void startLocalizer(Path nmPrivateContainerTokens, + InetSocketAddress nmAddr, String user, String appId, String locId, + List localDirs, List logDirs) throws IOException, + InterruptedException { + } + + @Override + public int launchContainer(Container container, + Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, + String user, String appId, Path containerWorkDir, + List localDirs, List logDirs) throws IOException { + return 0; + } + + @Override + public boolean signalContainer(String user, String pid, Signal signal) + throws IOException { + return true; + } + + @Override + public void deleteAsUser(String user, Path subDir, Path... basedirs) + throws IOException, InterruptedException { + } + + @Override + public String getProcessId(ContainerId containerID) { + return String.valueOf(containerID.getId()); + } + + public void setContainerPid(ContainerId containerId, String pid) { + containerIdToPid.put(containerId, pid); + } + } + + static class MockContainerEventHandler implements + EventHandler { + Set killedContainer = new HashSet(); + + @Override + public void handle(ContainerEvent event) { + if (event.getType() == ContainerEventType.KILL_CONTAINER) { + synchronized (killedContainer) { + killedContainer.add(event.getContainerID()); + } + } + } + + public boolean isContainerKilled(ContainerId containerId) { + synchronized (killedContainer) { + return killedContainer.contains(containerId); + } + } + } + + @Before + public void setup() { + executor = new MockExecutor(); + dispatcher = new AsyncDispatcher(); + conf = new Configuration(); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + MockResourceCalculatorPlugin.class.getCanonicalName()); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + MockResourceCalculatorProcessTree.class.getCanonicalName()); + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L); + dispatcher.init(conf); + dispatcher.start(); + containerEventHandler = new MockContainerEventHandler(); + dispatcher.register(ContainerEventType.class, containerEventHandler); + cm = new ContainersMonitorImpl(executor, dispatcher, null); + cm.init(conf); + cm.start(); + } + + @After + public void finalize() { + try { + cm.stop(); + } catch (Exception e) { + // do nothing + } + try { + dispatcher.stop(); + } catch (Exception e) { + // do nothing + } + } + + private ContainerId getContainerId(int id) { + return ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(123456L, 1), 1), id); + } + + private ProcessTreeInfo getProcessTreeInfo(ContainerId id) { + return cm.trackingContainers.get(id); + } + + @Test + public void testResourceChange() throws Exception { + // create a container-1 with + cm.handle(new ContainerStartMonitoringEvent( + getContainerId(1), 2100L, 1000L)); + + // check if this container is tracked and it's value + Thread.sleep(200); + Assert.assertNotNull(getProcessTreeInfo(getContainerId(1))); + Assert.assertEquals(1000L, getProcessTreeInfo(getContainerId(1)) + .getPmemLimit()); + Assert.assertEquals(2100L, getProcessTreeInfo(getContainerId(1)) + .getVmemLimit()); + + // increase size of pmem usage, it will be killed + MockResourceCalculatorProcessTree mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(1)).getProcessTree(); + mockTree.setPmem(2500L); + + // check if this container killed + Thread.sleep(200); + Assert.assertTrue(containerEventHandler + .isContainerKilled(getContainerId(1))); + + // create a container-2 with + cm.handle(new ContainerStartMonitoringEvent( + getContainerId(2), 2100L, 1000L)); + + // check if this container is tracked and it's value + Thread.sleep(200); + Assert.assertNotNull(getProcessTreeInfo(getContainerId(2))); + Assert.assertEquals(1000L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + Assert.assertEquals(2100L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + + // trigger a change resource event, check limit after changed + cm.handle(new ContainerChangeMonitoringEvent(getContainerId(2), 4200L, + 2000L)); + Thread.sleep(200); + Assert.assertNotNull(getProcessTreeInfo(getContainerId(2))); + Assert.assertEquals(2000L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + Assert.assertEquals(4200L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + + // increase size of pmem usage, it should NOT be killed + mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(2)).getProcessTree(); + mockTree.setPmem(2500L); + + // check if this container killed + Thread.sleep(200); + Assert.assertFalse(containerEventHandler + .isContainerKilled(getContainerId(2))); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index c9e57a6..88bcf95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -308,4 +310,11 @@ synchronized public GetContainerStatusesResponse getContainerStatuses( nodeStatus.setNodeHealthStatus(nodeHealthStatus); return nodeStatus; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index a9f1c1a..6f9a80f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -39,6 +39,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -129,6 +131,13 @@ public Credentials getContainerCredentials() throws IOException { credentials.readTokenStorageStream(buf); return credentials; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } public static class MockRMWithAMS extends MockRMWithCustomAMLauncher { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..90fe1bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -118,6 +120,13 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { return null; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } @Test