diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index dd1060c..0736b10 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -54,6 +54,8 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -71,6 +73,7 @@ import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; @@ -438,5 +441,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request) "Dummy function cause")); throw new IOException(e); } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 6f21c87..1f21568 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -45,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -443,6 +445,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws IOException { return null; } + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } @SuppressWarnings("serial") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 7aa43df..8d84789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -170,4 +172,16 @@ StopContainersResponse stopContainers(StopContainersRequest request) GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException, IOException; + + /** + * The API used by the ApplicationMaster to request change + * running containers in this node + * + * @throws YarnException + * @throws IOException + */ + @Public + ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java new file mode 100644 index 0000000..ab37f80 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by Application Master to ask + * Node Manager change resource quota used by a specified container + *

+ * + * @see ContainerManagementProtocol#changeContainersResource(ChangeContainersResourceRequest) + */ +@Public +@Stable +public abstract class ChangeContainersResourceRequest { + @Public + @Stable + public static ChangeContainersResourceRequest newInstance( + List containersTokensToIncrease, + List containersToDecrease) { + ChangeContainersResourceRequest request = + Records.newRecord(ChangeContainersResourceRequest.class); + request.setContainersToIncrease(containersTokensToIncrease); + request.setContainersToDecrease(containersToDecrease); + return request; + } + + /** + * get tokens of container need to be increased. + */ + @Public + @Stable + public abstract List getContainersToIncrease(); + + /** + * set tokens of containers need to be increased, token is acquired in + * AllocateResponse.getIncreasedContainers. Here we only need + * the token in ContainerResourceIncrease because container id + * and capability are already contained in token + */ + @Public + @Stable + public abstract void setContainersToIncrease( + List containersTokensToIncrease); + + /** + * get decrease containers sent by ApplicationMaster + */ + @Public + @Stable + public abstract List getContainersToDecrease(); + + /** + * set containers need to be decreased by ApplicationMaster to + * NodeManager + */ + @Public + @Stable + public abstract void setContainersToDecrease( + List containersToDecrease); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java new file mode 100644 index 0000000..9bd7708 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Stable +public abstract class ChangeContainersResourceResponse { + @Public + @Stable + public static ChangeContainersResourceResponse newInstance( + List succeedChangedContainers, + List failedChangedContainers) { + ChangeContainersResourceResponse request = + Records.newRecord(ChangeContainersResourceResponse.class); + request.setSucceedChangedContainers(succeedChangedContainers); + request.setFailedChangedContainers(failedChangedContainers); + return request; + } + + @Public + @Stable + public abstract List getSucceedChangedContainers(); + + @Public + @Stable + public abstract void setSucceedChangedContainers( + List succeedIncreasedContainers); + + @Public + @Stable + public abstract List getFailedChangedContainers(); + + @Public + @Stable + public abstract void setFailedChangedContainers( + List succeedIncreasedContainers); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 7b1647b..dd7874a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -34,4 +34,5 @@ service ContainerManagementProtocolService { rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); + rpc changeContainersResource(ChangeContainersResourceRequestProto) returns (ChangeContainersResourceResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 68d914e..d0bd3f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -239,3 +239,13 @@ message GetContainerStatusesResponseProto { repeated ContainerStatusProto status = 1; repeated ContainerExceptionMapProto failed_requests = 2; } + +message ChangeContainersResourceRequestProto { + repeated hadoop.common.TokenProto increase_containers = 1; + repeated ContainerResourceDecreaseProto decrease_containers = 2; +} + +message ChangeContainersResourceResponseProto { + repeated ContainerIdProto succeed_changed_containers = 1; + repeated ContainerIdProto failed_changed_containers = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 15397e3..63e3612 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -30,12 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -45,6 +49,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -128,4 +133,17 @@ public GetContainerStatusesResponse getContainerStatuses( return null; } } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + ChangeContainersResourceRequestProto requestProto = ((ChangeContainersResourceRequestPBImpl)request).getProto(); + try { + return new ChangeContainersResourceResponsePBImpl(proxy.changeContainersResource(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 2d33e69..55a9d33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -23,9 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; @@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses( throw new ServiceException(e); } } + + @Override + public ChangeContainersResourceResponseProto changeContainersResource( + RpcController controller, ChangeContainersResourceRequestProto proto) + throws ServiceException { + ChangeContainersResourceRequestPBImpl request = new ChangeContainersResourceRequestPBImpl(proto); + try { + ChangeContainersResourceResponse response = real.changeContainersResource(request); + return ((ChangeContainersResourceResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java new file mode 100644 index 0000000..45b5f89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class ChangeContainersResourceRequestPBImpl extends + ChangeContainersResourceRequest { + ChangeContainersResourceRequestProto proto = + ChangeContainersResourceRequestProto.getDefaultInstance(); + ChangeContainersResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private List containersToIncrease = null; + private List containersToDecrease = null; + + public ChangeContainersResourceRequestPBImpl() { + builder = ChangeContainersResourceRequestProto.newBuilder(); + } + + public ChangeContainersResourceRequestPBImpl( + ChangeContainersResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ChangeContainersResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.containersToIncrease != null) { + addIncreaseContainersToProto(); + } + if (this.containersToDecrease != null) { + addDecreaseContainersToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ChangeContainersResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getContainersToIncrease() { + initContainersToIncrease(); + return this.containersToIncrease; + } + + @Override + public void setContainersToIncrease(List containersToIncrease) { + if (containersToIncrease == null) { + return; + } + initContainersToIncrease(); + this.containersToIncrease.clear(); + this.containersToIncrease.addAll(containersToIncrease); + } + + @Override + public List getContainersToDecrease() { + initContainersToDecrease(); + return this.containersToDecrease; + } + + @Override + public void setContainersToDecrease( + List containersToDecrease) { + if (containersToDecrease == null) { + return; + } + initContainersToDecrease(); + this.containersToDecrease.clear(); + this.containersToDecrease.addAll(containersToDecrease); + } + + private void initContainersToIncrease() { + if (this.containersToIncrease != null) { + return; + } + ChangeContainersResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getIncreaseContainersList(); + this.containersToIncrease = new ArrayList(); + + for (TokenProto c : list) { + this.containersToIncrease.add(convertFromProtoFormat(c)); + } + } + + private void addIncreaseContainersToProto() { + maybeInitBuilder(); + builder.clearIncreaseContainers(); + if (this.containersToIncrease == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToIncrease.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public TokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllIncreaseContainers(iterable); + } + + private void initContainersToDecrease() { + if (this.containersToDecrease != null) { + return; + } + ChangeContainersResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getDecreaseContainersList(); + this.containersToDecrease = new ArrayList(); + + for (ContainerResourceDecreaseProto c : list) { + this.containersToDecrease.add(convertFromProtoFormat(c)); + } + } + + private void addDecreaseContainersToProto() { + maybeInitBuilder(); + builder.clearDecreaseContainers(); + if (this.containersToDecrease == null) { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToDecrease + .iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerResourceDecreaseProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllDecreaseContainers(iterable); + } + + private Token convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private ContainerResourceDecrease convertFromProtoFormat( + ContainerResourceDecreaseProto p) { + return new ContainerResourceDecreasePBImpl(p); + } + + private ContainerResourceDecreaseProto convertToProtoFormat( + ContainerResourceDecrease t) { + return ((ContainerResourceDecreasePBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java new file mode 100644 index 0000000..12f89c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class ChangeContainersResourceResponsePBImpl extends + ChangeContainersResourceResponse { + ChangeContainersResourceResponseProto proto = + ChangeContainersResourceResponseProto.getDefaultInstance(); + ChangeContainersResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + private List succeedChangedContainers = null; + private List failedChangedContainers = null; + + public ChangeContainersResourceResponsePBImpl() { + builder = ChangeContainersResourceResponseProto.newBuilder(); + } + + public ChangeContainersResourceResponsePBImpl( + ChangeContainersResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ChangeContainersResourceResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.succeedChangedContainers != null) { + addSucceedChangedContainersToProto(); + } + if (this.failedChangedContainers != null) { + addFailedChangedContainersToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ChangeContainersResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getSucceedChangedContainers() { + initSucceedChangedContainers(); + return this.succeedChangedContainers; + } + + @Override + public void setSucceedChangedContainers(List containers) { + if (containers == null) { + return; + } + initSucceedChangedContainers(); + this.succeedChangedContainers.clear(); + this.succeedChangedContainers.addAll(containers); + } + + private void initSucceedChangedContainers() { + if (this.succeedChangedContainers != null) { + return; + } + ChangeContainersResourceResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getSucceedChangedContainersList(); + this.succeedChangedContainers = new ArrayList(); + + for (ContainerIdProto c : list) { + this.succeedChangedContainers.add(convertFromProtoFormat(c)); + } + } + + private void addSucceedChangedContainersToProto() { + maybeInitBuilder(); + builder.clearSucceedChangedContainers(); + if (this.succeedChangedContainers == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = succeedChangedContainers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllSucceedChangedContainers(iterable); + } + + @Override + public List getFailedChangedContainers() { + initFailedChangedContainers(); + return this.failedChangedContainers; + } + + @Override + public void setFailedChangedContainers(List containers) { + if (containers == null) { + return; + } + initFailedChangedContainers(); + this.failedChangedContainers.clear(); + this.failedChangedContainers.addAll(containers); + } + + private void initFailedChangedContainers() { + if (this.failedChangedContainers != null) { + return; + } + ChangeContainersResourceResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getFailedChangedContainersList(); + this.failedChangedContainers = new ArrayList(); + + for (ContainerIdProto c : list) { + this.failedChangedContainers.add(convertFromProtoFormat(c)); + } + } + + private void addFailedChangedContainersToProto() { + maybeInitBuilder(); + builder.clearFailedChangedContainers(); + if (this.failedChangedContainers == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = failedChangedContainers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllFailedChangedContainers(iterable); + } + + private ContainerId convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 8fe5c3c..6771d88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -33,6 +33,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -165,5 +167,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesResponse.newInstance(list, null); return null; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 76384d3..8ae6a9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -218,6 +220,13 @@ public StopContainersResponse stopContainers(StopContainersRequest request) new Exception(EXCEPTION_CAUSE)); throw new YarnException(e); } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java new file mode 100644 index 0000000..61102ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.junit.Test; + +public class TestResourceChangeRPC { + + static final Log LOG = LogFactory.getLog(TestContainerLaunchRPC.class); + + @Test + public void testResourceChangeRPC() throws Exception { + testResourceChange(HadoopYarnProtoRPC.class.getName()); + } + + private void testResourceChange(String rpcClass) throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = + rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + try { + + ContainerManagementProtocol proxy = + (ContainerManagementProtocol) rpc.getProxy( + ContainerManagementProtocol.class, server.getListenerAddress(), + conf); + + List inc = new ArrayList(); + List dec = + new ArrayList(); + + for (int i = 0; i < 3; i++) { + inc.add(Token.newInstance("id".getBytes(), "pb", "passwd".getBytes(), + "service")); + } + for (int i = 0; i < 5; i++) { + dec.add(ContainerResourceDecrease.newInstance(null, null)); + } + + ChangeContainersResourceRequest req = + ChangeContainersResourceRequest.newInstance(inc, dec); + + try { + ChangeContainersResourceResponse res = + proxy.changeContainersResource(req); + Assert.assertEquals(inc.size(), res.getSucceedChangedContainers() + .size()); + Assert + .assertEquals(dec.size(), res.getFailedChangedContainers().size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } finally { + server.stop(); + } + } + + public class DummyContainerManager implements ContainerManagementProtocol { + + @Override + public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException, IOException { + return null; + } + + @Override + public StopContainersResponse + stopContainers(StopContainersRequest requests) throws YarnException, + IOException { + return null; + } + + @Override + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws YarnException, IOException { + return null; + } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + ContainerId cid = ContainerId.newInstance(null, 0); + List inc = new ArrayList(); + List dec = new ArrayList(); + for (int i = 0; i < request.getContainersToIncrease().size(); i++) { + inc.add(cid); + } + for (int i = 0; i < request.getContainersToDecrease().size(); i++) { + dec.add(cid); + } + + ChangeContainersResourceResponse res = + ChangeContainersResourceResponse.newInstance(inc, dec); + return res; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java new file mode 100644 index 0000000..3087c14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.junit.Test; + +public class TestChangeContainersResourceRequest { + int id = 0; + + private ContainerResourceDecrease getNextResourceChangeContext() { + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234, 3), 3), id++); + Resource resource = Resource.newInstance(1023, 3); + ContainerResourceDecrease rcContext = + ContainerResourceDecrease.newInstance(containerId, resource); + return rcContext; + } + + @Test + public void testChangeContainersResourceRequest() { + List containerToIncrease = new ArrayList(); + List containerToDecrease = + new ArrayList(); + + for (int i = 0; i < 10; i++) { + Token ctx = + Token.newInstance("identifier".getBytes(), "simple", + "passwd".getBytes(), "service"); + containerToIncrease.add(ctx); + } + for (int i = 0; i < 5; i++) { + ContainerResourceDecrease ctx = getNextResourceChangeContext(); + containerToDecrease.add(ctx); + } + + ChangeContainersResourceRequest request = + ChangeContainersResourceRequest.newInstance(containerToIncrease, + containerToDecrease); + + // serde + ChangeContainersResourceRequestProto proto = + ((ChangeContainersResourceRequestPBImpl) request).getProto(); + request = new ChangeContainersResourceRequestPBImpl(proto); + + // check value + Assert.assertEquals(request.getContainersToIncrease().size(), + containerToIncrease.size()); + Assert.assertEquals(request.getContainersToDecrease().size(), + containerToDecrease.size()); + for (int i = 0; i < containerToDecrease.size(); i++) { + Assert.assertEquals(request.getContainersToDecrease().get(i), + containerToDecrease.get(i)); + } + } + + @Test + public void testChangeContainersResourceRequestWithNull() { + ChangeContainersResourceRequest request = + ChangeContainersResourceRequest.newInstance(null, null); + + // serde + ChangeContainersResourceRequestProto proto = + ((ChangeContainersResourceRequestPBImpl) request).getProto(); + request = new ChangeContainersResourceRequestPBImpl(proto); + + // check value + Assert.assertEquals(0, request.getContainersToIncrease().size()); + Assert.assertEquals(0, request.getContainersToDecrease().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java new file mode 100644 index 0000000..2af49c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; +import org.junit.Test; + +public class TestChangeContainersResourceResponse { + int id = 0; + + private ContainerId getNextContainerId() { + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234, 3), 3), id++); + return containerId; + } + + @Test + public void testChangeContainersResourceRequest() { + List succeedChanged = new ArrayList(); + List failChanged = new ArrayList(); + + for (int i = 0; i < 10; i++) { + succeedChanged.add(getNextContainerId()); + } + + for (int i = 0; i < 8; i++) { + failChanged.add(getNextContainerId()); + } + + ChangeContainersResourceResponse response = + ChangeContainersResourceResponse.newInstance(succeedChanged, + failChanged); + + // serde + ChangeContainersResourceResponseProto proto = + ((ChangeContainersResourceResponsePBImpl) response).getProto(); + response = new ChangeContainersResourceResponsePBImpl(proto); + + // check value + Assert.assertEquals(response.getSucceedChangedContainers().size(), + succeedChanged.size()); + Assert.assertEquals(response.getFailedChangedContainers().size(), + failChanged.size()); + for (int i = 0; i < succeedChanged.size(); i++) { + Assert.assertEquals(response.getSucceedChangedContainers().get(i), + succeedChanged.get(i)); + } + for (int i = 0; i < failChanged.size(); i++) { + Assert.assertEquals(response.getFailedChangedContainers().get(i), + failChanged.get(i)); + } + } + + @Test + public void testChangeContainersResourceRequestWithNull() { + ChangeContainersResourceResponse request = + ChangeContainersResourceResponse.newInstance(null, null); + + // serde + ChangeContainersResourceResponseProto proto = + ((ChangeContainersResourceResponsePBImpl) request).getProto(); + request = new ChangeContainersResourceResponsePBImpl(proto); + + // check value + Assert.assertEquals(0, request.getSucceedChangedContainers().size()); + Assert.assertEquals(0, request.getFailedChangedContainers().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3..52f7fb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -55,6 +55,8 @@ import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -65,9 +67,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -112,6 +116,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerChangeMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; @@ -907,4 +912,125 @@ public Context getContext() { public Map getAuxServiceMetaData() { return this.auxiliaryServices.getMetaData(); } + + @Private + @VisibleForTesting + protected void authorizeIncreaseRequest(NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier) throws YarnException { + ContainerId containerId = containerTokenIdentifier.getContainerID(); + boolean unauthorized = false; + StringBuilder messageBuilder = + new StringBuilder("Unauthorized request to change container. "); + if (!nmTokenIdentifier.getApplicationAttemptId().equals( + containerId.getApplicationAttemptId())) { + unauthorized = true; + messageBuilder.append("\nNMToken for application attempt : ") + .append(nmTokenIdentifier.getApplicationAttemptId()) + .append(" was used for starting container with container token") + .append(" issued for application attempt : ") + .append(containerId.getApplicationAttemptId()); + } + if (unauthorized) { + String msg = messageBuilder.toString(); + LOG.error(msg); + throw RPCUtil.getRemoteException(msg); + } + } + + private void internalChangeContainerResource(ContainerId containerId, + Resource resource, List succeedChangedContainers, + List failedChangedContainers, + ContainerResourceDecrease decrease) throws YarnException { + // check container's existence + if (context.getContainers().get(containerId) == null) { + LOG.error("failed to increase size of container because" + + " this container not existed in this node:" + + containerId.toString()); + failedChangedContainers.add(containerId); + return; + } + + // check container's state + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState currentState = + context.getContainers().get(containerId).getContainerState(); + if (currentState != org.apache.hadoop.yarn.server. + nodemanager.containermanager.container.ContainerState.RUNNING) { + LOG.warn("We can only increase size of container in" + + " RUNNING state, containerId=" + containerId.toString() + + " current state is:" + currentState.name()); + failedChangedContainers.add(containerId); + return; + } + + // create ChangeContainerMonitoringEvent + long pmemBytes = resource.getMemory() * 1024 * 1024L; + float pmemRatio = + getConfig().getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + ContainerChangeMonitoringEvent monitorEvent = + new ContainerChangeMonitoringEvent(containerId, vmemBytes, pmemBytes); + containersMonitor.handle(monitorEvent); + + // We can consider the change successful + succeedChangedContainers.add(containerId); + + // if it's a succeed decreased container, add it to context, this will be + // used by NodeStatusUpdater + if (null != decrease) { + if (!context.getDecreasedContainers().offer(decrease)) { + LOG.error("Failed to decreased container" + + " to queue in NMContext, this shouldn't happen."); + throw new YarnException("Failed to decreased container" + + " to queue in NMContext, this shouldn't happen."); + } + } + } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + // get NMToken identifier + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier nmIdentifier = selectNMTokenIdentifier(remoteUgi); + + List succeedChangedContainers = new ArrayList(); + List failedChangedContainers = new ArrayList(); + + // loop all decrease request, try to change their resource limits + for (ContainerResourceDecrease r : request.getContainersToDecrease()) { + if (r.getContainerId() != null && r.getCapability() != null) { + internalChangeContainerResource(r.getContainerId(), r.getCapability(), + succeedChangedContainers, failedChangedContainers, r); + } + } + + // loop all increase request to check + for (org.apache.hadoop.yarn.api.records.Token token : request + .getContainersToIncrease()) { + ContainerTokenIdentifier containerTokenIdentifier = + BuilderUtils.newContainerTokenIdentifier(token); + + ContainerId containerId = containerTokenIdentifier.getContainerID(); + Resource resource = containerTokenIdentifier.getResource(); + try { + authorizeIncreaseRequest(nmIdentifier, containerTokenIdentifier); + } catch (YarnException e) { + failedChangedContainers.add(containerTokenIdentifier.getContainerID()); + continue; + } + + // change container monitoring size, pass null in last parameter to + // indicate it's not a decrease container request + internalChangeContainerResource(containerId, resource, + succeedChangedContainers, failedChangedContainers, null); + } + + ChangeContainersResourceResponse response = + ChangeContainersResourceResponse.newInstance(succeedChangedContainers, + failedChangedContainers); + return response; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index a47e7f7..ebfc603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -203,4 +203,10 @@ protected ContainerExecutor createContainerExecutor() { linuxContainerExecutor.setConf(super.conf); return linuxContainerExecutor; } + + @Override + public void testContainersChangeResource() { + LOG.info("not running this test."); + return; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index f62cd50..6ba2371 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -40,6 +40,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -791,6 +793,69 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception { Assert.assertTrue(response.getFailedRequests().get(cId).getMessage() .contains("The auxService:" + serviceName + " does not exist")); } + + @Test + public void testContainersChangeResource() throws Exception { + containerManager.start(); + + // first we will start containers 0..4 + List list = new ArrayList(); + ContainerLaunchContext containerLaunchContext = recordFactory + .newRecordInstance(ContainerLaunchContext.class); + for (int i = 0; i < 4; i++) { + ContainerId cId = createContainerId(i); + long identifier = DUMMY_RM_IDENTIFIER; + Token containerToken = createContainerToken(cId, identifier, + context.getNodeId(), user, context.getContainerTokenSecretManager()); + StartContainerRequest request = StartContainerRequest.newInstance( + containerLaunchContext, containerToken); + list.add(request); + } + StartContainersRequest requestList = StartContainersRequest + .newInstance(list); + + StartContainersResponse response = containerManager + .startContainers(requestList); + + Assert.assertEquals(4, response.getSuccessfullyStartedContainers().size()); + for (int i = 0; i < response.getSuccessfullyStartedContainers().size(); i++) { + Assert.assertEquals(i, response.getSuccessfullyStartedContainers().get(i) + .getId()); + } + + // we will create some change request, + List increaseTokens = new ArrayList(); + + // add change request for container-0, the resource not match because + // container's state in CM will not be RUNNING + ContainerId cId = createContainerId(0); + long identifier = DUMMY_RM_IDENTIFIER; + Token containerToken = + createContainerToken(cId, identifier, context.getNodeId(), user, + context.getContainerTokenSecretManager(), + Resource.newInstance(9999, 3)); + increaseTokens.add(containerToken); + + // add change request for container-7, the resource not match because + // container not existed + cId = createContainerId(7); + identifier = DUMMY_RM_IDENTIFIER; + containerToken = + createContainerToken(cId, identifier, context.getNodeId(), user, + context.getContainerTokenSecretManager(), + Resource.newInstance(9999, 3)); + increaseTokens.add(containerToken); + + ChangeContainersResourceRequest changeRequest = ChangeContainersResourceRequest + .newInstance(increaseTokens, null); + ChangeContainersResourceResponse changeResponse = containerManager + .changeContainersResource(changeRequest); + + // check response + Assert.assertEquals(2, changeResponse.getFailedChangedContainers().size()); + Assert.assertEquals(0, changeResponse.getFailedChangedContainers().get(0).getId()); + Assert.assertEquals(7, changeResponse.getFailedChangedContainers().get(1).getId()); + } public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, @@ -807,4 +872,19 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, containerTokenIdentifier); return containerToken; } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, + NMContainerTokenSecretManager containerTokenSecretManager, Resource resource) + throws IOException { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource, + System.currentTimeMillis() + 100000L, 123, rmIdentifier); + Token containerToken = + BuilderUtils + .newContainerToken(nodeId, containerTokenSecretManager + .retrievePassword(containerTokenIdentifier), + containerTokenIdentifier); + return containerToken; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index c9e57a6..88bcf95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -308,4 +310,11 @@ synchronized public GetContainerStatusesResponse getContainerStatuses( nodeStatus.setNodeHealthStatus(nodeHealthStatus); return nodeStatus; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index a9f1c1a..6f9a80f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -39,6 +39,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -129,6 +131,13 @@ public Credentials getContainerCredentials() throws IOException { credentials.readTokenStorageStream(buf); return credentials; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } public static class MockRMWithAMS extends MockRMWithCustomAMLauncher { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..90fe1bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -118,6 +120,13 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { return null; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } @Test