diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index ee0544a..cb6216b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index dd1060c..73e01d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -54,6 +54,8 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -71,6 +73,7 @@ import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; @@ -438,5 +441,13 @@ public StopContainersResponse stopContainers(StopContainersRequest request) "Dummy function cause")); throw new IOException(e); } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + // TODO Auto-generated method stub + return null; + } } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 6f21c87..c345e77 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -45,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -443,6 +445,13 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws IOException { return null; } + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + // TODO Auto-generated method stub + return null; + } } @SuppressWarnings("serial") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 7aa43df..8d84789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -170,4 +172,16 @@ StopContainersResponse stopContainers(StopContainersRequest request) GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException, IOException; + + /** + * The API used by the ApplicationMaster to request change + * running containers in this node + * + * @throws YarnException + * @throws IOException + */ + @Public + ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java new file mode 100644 index 0000000..747734c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java @@ -0,0 +1,45 @@ +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; +import org.apache.hadoop.yarn.api.records.ResourceIncreaseContext; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by ApplicationMaster to ask + * NodeManager change resource quota used by a specified container + *

+ * + * @see ContainerManagementProtocol#changeContainersResource(ChangeContainersResourceRequest) + */ +@Public +public abstract class ChangeContainersResourceRequest { + @Public + public static ChangeContainersResourceRequest newInstance( + List containersToIncrease, + List containersToDecrease) { + ChangeContainersResourceRequest request = Records + .newRecord(ChangeContainersResourceRequest.class); + request.setContainersToIncrease(containersToIncrease); + request.setContainersToDecrease(containersToDecrease); + return request; + } + + @Public + public abstract List getContainersToIncrease(); + + @Public + public abstract void setContainersToIncrease( + List containersToIncrease); + + @Public + public abstract List getContainersToDecrease(); + + @Public + public abstract void setContainersToDecrease( + List containersToDecrease); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java new file mode 100644 index 0000000..5b35249 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java @@ -0,0 +1,35 @@ +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Records; + +@Public +public abstract class ChangeContainersResourceResponse { + @Public + public static ChangeContainersResourceResponse newInstance( + List succeedChangedContainers, + List failedChangedContainers) { + ChangeContainersResourceResponse request = Records + .newRecord(ChangeContainersResourceResponse.class); + request.setSucceedChangedContainers(succeedChangedContainers); + request.setFailedChangedContainers(failedChangedContainers); + return request; + } + + @Public + public abstract List getSucceedChangedContainers(); + + @Public + public abstract void setSucceedChangedContainers( + List succeedIncreasedContainers); + + @Public + public abstract List getFailedChangedContainers(); + + @Public + public abstract void setFailedChangedContainers( + List succeedIncreasedContainers); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 7b1647b..dd7874a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -34,4 +34,5 @@ service ContainerManagementProtocolService { rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); + rpc changeContainersResource(ChangeContainersResourceRequestProto) returns (ChangeContainersResourceResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index cf7a5cd..17a3359 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -229,3 +229,13 @@ message GetContainerStatusesResponseProto { repeated ContainerStatusProto status = 1; repeated ContainerExceptionMapProto failed_requests = 2; } + +message ChangeContainersResourceRequestProto { + repeated ResourceIncreaseContextProto increase_containers = 1; + repeated ResourceChangeContextProto decrease_containers = 2; +} + +message ChangeContainersResourceResponseProto { + repeated ContainerIdProto succeed_changed_containers = 1; + repeated ContainerIdProto failed_changed_containers = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 15397e3..63e3612 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -30,12 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -45,6 +49,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -128,4 +133,17 @@ public GetContainerStatusesResponse getContainerStatuses( return null; } } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + ChangeContainersResourceRequestProto requestProto = ((ChangeContainersResourceRequestPBImpl)request).getProto(); + try { + return new ChangeContainersResourceResponsePBImpl(proxy.changeContainersResource(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 2d33e69..55a9d33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -23,9 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; @@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses( throw new ServiceException(e); } } + + @Override + public ChangeContainersResourceResponseProto changeContainersResource( + RpcController controller, ChangeContainersResourceRequestProto proto) + throws ServiceException { + ChangeContainersResourceRequestPBImpl request = new ChangeContainersResourceRequestPBImpl(proto); + try { + ChangeContainersResourceResponse response = real.changeContainersResource(request); + return ((ChangeContainersResourceResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java new file mode 100644 index 0000000..f5be868 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java @@ -0,0 +1,241 @@ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; +import org.apache.hadoop.yarn.api.records.ResourceIncreaseContext; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceChangeContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceIncreaseContextPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceChangeContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceIncreaseContextProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +public class ChangeContainersResourceRequestPBImpl extends + ChangeContainersResourceRequest { + ChangeContainersResourceRequestProto proto = ChangeContainersResourceRequestProto + .getDefaultInstance(); + ChangeContainersResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private List containersToIncrease = null; + private List containersToDecrease = null; + + public ChangeContainersResourceRequestPBImpl() { + builder = ChangeContainersResourceRequestProto.newBuilder(); + } + + public ChangeContainersResourceRequestPBImpl( + ChangeContainersResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ChangeContainersResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.containersToIncrease != null) { + addIncreaseContainersToProto(); + } + if (this.containersToDecrease != null) { + addDecreaseContainersToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ChangeContainersResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getContainersToIncrease() { + initContainersToIncrease(); + return this.containersToIncrease; + } + + @Override + public void setContainersToIncrease( + List containersToIncrease) { + if (containersToIncrease == null) { + return; + } + initContainersToIncrease(); + this.containersToIncrease.clear(); + this.containersToIncrease.addAll(containersToIncrease); + } + + @Override + public List getContainersToDecrease() { + initContainersToDecrease(); + return this.containersToDecrease; + } + + @Override + public void setContainersToDecrease( + List containersToDecrease) { + if (containersToDecrease == null) { + return; + } + initContainersToDecrease(); + this.containersToDecrease.clear(); + this.containersToDecrease.addAll(containersToDecrease); + } + + private void initContainersToIncrease() { + if (this.containersToIncrease != null) { + return; + } + ChangeContainersResourceRequestProtoOrBuilder p = viaProto ? proto + : builder; + List list = p.getIncreaseContainersList(); + this.containersToIncrease = new ArrayList(); + + for (ResourceIncreaseContextProto c : list) { + this.containersToIncrease.add(convertFromProtoFormat(c)); + } + } + + private void addIncreaseContainersToProto() { + maybeInitBuilder(); + builder.clearIncreaseContainers(); + if (this.containersToIncrease == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToIncrease + .iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ResourceIncreaseContextProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllIncreaseContainers(iterable); + } + + private void initContainersToDecrease() { + if (this.containersToDecrease != null) { + return; + } + ChangeContainersResourceRequestProtoOrBuilder p = viaProto ? proto + : builder; + List list = p.getDecreaseContainersList(); + this.containersToDecrease = new ArrayList(); + + for (ResourceChangeContextProto c : list) { + this.containersToDecrease.add(convertFromProtoFormat(c)); + } + } + + private void addDecreaseContainersToProto() { + maybeInitBuilder(); + builder.clearDecreaseContainers(); + if (this.containersToDecrease == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToDecrease + .iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ResourceChangeContextProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllDecreaseContainers(iterable); + } + + private ResourceChangeContext convertFromProtoFormat( + ResourceChangeContextProto p) { + return new ResourceChangeContextPBImpl(p); + } + + private ResourceChangeContextProto convertToProtoFormat( + ResourceChangeContext t) { + return ((ResourceChangeContextPBImpl) t).getProto(); + } + + private ResourceIncreaseContext convertFromProtoFormat( + ResourceIncreaseContextProto p) { + return new ResourceIncreaseContextPBImpl(p); + } + + private ResourceIncreaseContextProto convertToProtoFormat( + ResourceIncreaseContext t) { + return ((ResourceIncreaseContextPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java new file mode 100644 index 0000000..48e13e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java @@ -0,0 +1,220 @@ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +public class ChangeContainersResourceResponsePBImpl extends + ChangeContainersResourceResponse { + ChangeContainersResourceResponseProto proto = ChangeContainersResourceResponseProto + .getDefaultInstance(); + ChangeContainersResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + private List succeedChangedContainers = null; + private List failedChangedContainers = null; + + public ChangeContainersResourceResponsePBImpl() { + builder = ChangeContainersResourceResponseProto.newBuilder(); + } + + public ChangeContainersResourceResponsePBImpl( + ChangeContainersResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ChangeContainersResourceResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.succeedChangedContainers != null) { + addSucceedChangedContainersToProto(); + } + if (this.failedChangedContainers != null) { + addFailedChangedContainersToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ChangeContainersResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getSucceedChangedContainers() { + initSucceedChangedContainers(); + return this.succeedChangedContainers; + } + + @Override + public void setSucceedChangedContainers(List containers) { + if (containers == null) { + return; + } + initSucceedChangedContainers(); + this.succeedChangedContainers.clear(); + this.succeedChangedContainers.addAll(containers); + } + + private void initSucceedChangedContainers() { + if (this.succeedChangedContainers != null) { + return; + } + ChangeContainersResourceResponseProtoOrBuilder p = viaProto ? proto + : builder; + List list = p.getSucceedChangedContainersList(); + this.succeedChangedContainers = new ArrayList(); + + for (ContainerIdProto c : list) { + this.succeedChangedContainers.add(convertFromProtoFormat(c)); + } + } + + private void addSucceedChangedContainersToProto() { + maybeInitBuilder(); + builder.clearSucceedChangedContainers(); + if (this.succeedChangedContainers == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = succeedChangedContainers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllSucceedChangedContainers(iterable); + } + + @Override + public List getFailedChangedContainers() { + initFailedChangedContainers(); + return this.failedChangedContainers; + } + + @Override + public void setFailedChangedContainers(List containers) { + if (containers == null) { + return; + } + initFailedChangedContainers(); + this.failedChangedContainers.clear(); + this.failedChangedContainers.addAll(containers); + } + + private void initFailedChangedContainers() { + if (this.failedChangedContainers != null) { + return; + } + ChangeContainersResourceResponseProtoOrBuilder p = viaProto ? proto + : builder; + List list = p.getFailedChangedContainersList(); + this.failedChangedContainers = new ArrayList(); + + for (ContainerIdProto c : list) { + this.failedChangedContainers.add(convertFromProtoFormat(c)); + } + } + + private void addFailedChangedContainersToProto() { + maybeInitBuilder(); + builder.clearFailedChangedContainers(); + if (this.failedChangedContainers == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = failedChangedContainers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllFailedChangedContainers(iterable); + } + + private ContainerId convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 8fe5c3c..866e206 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -33,6 +33,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -165,5 +167,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesResponse.newInstance(list, null); return null; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 76384d3..6cf555e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -218,6 +220,14 @@ public StopContainersResponse stopContainers(StopContainersRequest request) new Exception(EXCEPTION_CAUSE)); throw new YarnException(e); } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + // TODO Auto-generated method stub + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java new file mode 100644 index 0000000..9f73e3b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java @@ -0,0 +1,125 @@ +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; +import org.apache.hadoop.yarn.api.records.ResourceIncreaseContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.junit.Test; + +public class TestResourceChangeRPC { + + static final Log LOG = LogFactory.getLog(TestContainerLaunchRPC.class); + + @Test + public void testResourceChangeRPC() throws Exception { + testResourceChange(HadoopYarnProtoRPC.class.getName()); + } + + private void testResourceChange(String rpcClass) throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + try { + + ContainerManagementProtocol proxy = (ContainerManagementProtocol) rpc + .getProxy(ContainerManagementProtocol.class, + server.getListenerAddress(), conf); + + List inc = new ArrayList(); + List dec = new ArrayList(); + + for (int i = 0; i < 3; i++) { + inc.add(ResourceIncreaseContext.newInstance(null, null)); + } + for (int i = 0; i < 5; i++) { + dec.add(ResourceChangeContext.newInstance(null, null)); + } + + ChangeContainersResourceRequest req = ChangeContainersResourceRequest + .newInstance(inc, dec); + + try { + ChangeContainersResourceResponse res = proxy + .changeContainersResource(req); + Assert.assertEquals(inc.size(), res.getSucceedChangedContainers() + .size()); + Assert.assertEquals(dec.size(), res.getFailedChangedContainers() + .size()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } finally { + server.stop(); + } + } + + public class DummyContainerManager implements ContainerManagementProtocol { + + @Override + public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException, IOException { + return null; + } + + @Override + public StopContainersResponse stopContainers(StopContainersRequest requests) + throws YarnException, IOException { + return null; + } + + @Override + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws YarnException, IOException { + return null; + } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + ContainerId cid = ContainerId.newInstance(null, 0); + List inc = new ArrayList(); + List dec = new ArrayList(); + for (int i = 0; i < request.getContainersToIncrease().size(); i++) { + inc.add(cid); + } + for (int i = 0; i < request.getContainersToDecrease().size(); i++) { + dec.add(cid); + } + + ChangeContainersResourceResponse res = ChangeContainersResourceResponse + .newInstance(inc, dec); + return res; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java new file mode 100644 index 0000000..12d3445 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; +import org.apache.hadoop.yarn.api.records.ResourceIncreaseContext; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto; +import org.junit.Test; + +public class TestChangeContainersResourceRequest { + int id = 0; + + private ResourceChangeContext getNextResourceChangeContext() { + ContainerId containerId = ContainerId + .newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234, 3), 3), id++); + Resource resource = Resource.newInstance(1023, 3); + ResourceChangeContext rcContext = ResourceChangeContext.newInstance( + containerId, resource); + return rcContext; + } + + @Test + public void testChangeContainersResourceRequest() { + List containerToIncrease = new ArrayList(); + List containerToDecrease = new ArrayList(); + + for (int i = 0; i < 10; i++) { + ResourceIncreaseContext ctx = ResourceIncreaseContext.newInstance( + getNextResourceChangeContext(), null); + containerToIncrease.add(ctx); + } + for (int i = 0; i < 5; i++) { + ResourceChangeContext ctx = getNextResourceChangeContext(); + containerToDecrease.add(ctx); + } + + ChangeContainersResourceRequest request = ChangeContainersResourceRequest + .newInstance(containerToIncrease, containerToDecrease); + + // serde + ChangeContainersResourceRequestProto proto = ((ChangeContainersResourceRequestPBImpl) request) + .getProto(); + request = new ChangeContainersResourceRequestPBImpl(proto); + + // check value + Assert.assertEquals(request.getContainersToIncrease().size(), + containerToIncrease.size()); + Assert.assertEquals(request.getContainersToDecrease().size(), + containerToDecrease.size()); + for (int i = 0; i < containerToIncrease.size(); i++) { + Assert.assertTrue(request.getContainersToIncrease().get(i) + .equals(containerToIncrease.get(i))); + } + for (int i = 0; i < containerToDecrease.size(); i++) { + Assert.assertTrue(request.getContainersToDecrease().get(i) + .equals(containerToDecrease.get(i))); + } + } + + @Test + public void testChangeContainersResourceRequestWithNull() { + ChangeContainersResourceRequest request = ChangeContainersResourceRequest.newInstance(null, null); + + // serde + ChangeContainersResourceRequestProto proto = ((ChangeContainersResourceRequestPBImpl)request).getProto(); + request = new ChangeContainersResourceRequestPBImpl(proto); + + // check value + Assert.assertEquals(0, request.getContainersToIncrease().size()); + Assert.assertEquals(0, request.getContainersToDecrease().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java new file mode 100644 index 0000000..5ee4b16 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java @@ -0,0 +1,74 @@ +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto; +import org.junit.Test; + +public class TestChangeContainersResourceResponse { + int id = 0; + + private ContainerId getNextContainerId() { + ContainerId containerId = ContainerId + .newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234, 3), 3), id++); + return containerId; + } + + @Test + public void testChangeContainersResourceRequest() { + List succeedChanged = new ArrayList(); + List failChanged = new ArrayList(); + + for (int i = 0; i < 10; i++) { + succeedChanged.add(getNextContainerId()); + } + + for (int i = 0; i < 8; i++) { + failChanged.add(getNextContainerId()); + } + + ChangeContainersResourceResponse response = ChangeContainersResourceResponse + .newInstance(succeedChanged, failChanged); + + // serde + ChangeContainersResourceResponseProto proto = ((ChangeContainersResourceResponsePBImpl)response) + .getProto(); + response = new ChangeContainersResourceResponsePBImpl(proto); + + // check value + Assert.assertEquals(response.getSucceedChangedContainers().size(), + succeedChanged.size()); + Assert.assertEquals(response.getFailedChangedContainers().size(), + failChanged.size()); + for (int i = 0; i < succeedChanged.size(); i++) { + Assert.assertTrue(response.getSucceedChangedContainers().get(i) + .equals(succeedChanged.get(i))); + } + for (int i = 0; i < failChanged.size(); i++) { + Assert.assertTrue(response.getFailedChangedContainers().get(i) + .equals(failChanged.get(i))); + } + } + + @Test + public void testChangeContainersResourceRequestWithNull() { + ChangeContainersResourceResponse request = ChangeContainersResourceResponse.newInstance(null, null); + + // serde + ChangeContainersResourceResponseProto proto = ((ChangeContainersResourceResponsePBImpl)request).getProto(); + request = new ChangeContainersResourceResponsePBImpl(proto); + + // check value + Assert.assertEquals(0, request.getSucceedChangedContainers().size()); + Assert.assertEquals(0, request.getFailedChangedContainers().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index aad819d..e4dde47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; - +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; public abstract class NodeStatus { @@ -55,4 +55,9 @@ public abstract void setContainersStatuses( public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + public abstract void setNewDecreasedContainers( + List decreasedContainers); + + public abstract List getNewsDecreasedContainers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 65376dc..8a5c25c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -26,12 +26,16 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceChangeContextPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceChangeContextProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; @@ -48,6 +52,7 @@ private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; + private List newDecreasedContainers = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -78,6 +83,9 @@ private synchronized void mergeLocalToBuilder() { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.newDecreasedContainers != null) { + addNewDecreasedContainersToProto(); + } } private synchronized void mergeLocalToProto() { @@ -163,6 +171,41 @@ public void remove() { }; builder.addAllKeepAliveApplications(iterable); } + + private synchronized void addNewDecreasedContainersToProto() { + maybeInitBuilder(); + builder.clearNewDecreasedContainers(); + if (newDecreasedContainers == null) + return; + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = newDecreasedContainers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ResourceChangeContextProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllNewDecreasedContainers(iterable); + } + @Override public int hashCode() { @@ -291,6 +334,35 @@ public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) { this.nodeHealthStatus = healthStatus; } + @Override + public void setNewDecreasedContainers( + List decreasedContainers) { + if (decreasedContainers == null) { + builder.clearNewDecreasedContainers(); + } + this.newDecreasedContainers = decreasedContainers; + } + + @Override + public List getNewsDecreasedContainers() { + initDecreasedContainers(); + return this.newDecreasedContainers; + } + + private synchronized void initDecreasedContainers() { + if (this.newDecreasedContainers != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNewDecreasedContainersList(); + this.newDecreasedContainers = new ArrayList(); + + for (ResourceChangeContextProto c : list) { + this.newDecreasedContainers.add(convertFromProtoFormat(c)); + } + + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -316,6 +388,14 @@ private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { return ((ContainerStatusPBImpl)c).getProto(); } + private ResourceChangeContextProto convertToProtoFormat(ResourceChangeContext t) { + return ((ResourceChangeContextPBImpl)t).getProto(); + } + + private ResourceChangeContext convertFromProtoFormat(ResourceChangeContextProto p) { + return new ResourceChangeContextPBImpl(p); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { return new ApplicationIdPBImpl(c); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 4f5d168..c42ad40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -36,6 +36,7 @@ message NodeStatusProto { repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + repeated ResourceChangeContextProto new_decreased_containers = 6; } message MasterKeyProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java new file mode 100644 index 0000000..7388a97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; +import org.junit.Test; + +public class TestNodeStatus { + @Test + public void testNodeStatusWithDecreasedContainers() { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); + + // add 3 instances + List list = new ArrayList(); + list.add(ResourceChangeContext.newInstance(null, null)); + list.add(ResourceChangeContext.newInstance(null, null)); + list.add(ResourceChangeContext.newInstance(null, null)); + nodeStatus.setNewDecreasedContainers(list); + + // serde + NodeStatusProto proto = ((NodeStatusPBImpl)nodeStatus).getProto(); + nodeStatus = new NodeStatusPBImpl(proto); + + // check value + Assert.assertEquals(list.size(), nodeStatus.getNewsDecreasedContainers().size()); + } + + @Test + public void testNodeStatusWithoutDecreasedContainers() { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); + + // serde + NodeStatusProto proto = ((NodeStatusPBImpl)nodeStatus).getProto(); + nodeStatus = new NodeStatusPBImpl(proto); + + // check value + Assert.assertEquals(0, nodeStatus.getNewsDecreasedContainers().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3..3109302 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -55,6 +55,8 @@ import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -68,6 +70,9 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceChangeContext; +import org.apache.hadoop.yarn.api.records.ResourceIncreaseContext; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -907,4 +912,12 @@ public Context getContext() { public Map getAuxServiceMetaData() { return this.auxiliaryServices.getMetaData(); } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + // TODO Auto-generated method stub + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index c9e57a6..f0c0d08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -308,4 +310,12 @@ synchronized public GetContainerStatusesResponse getContainerStatuses( nodeStatus.setNodeHealthStatus(nodeHealthStatus); return nodeStatus; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + // TODO Auto-generated method stub + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index a9f1c1a..6f9a80f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -39,6 +39,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -129,6 +131,13 @@ public Credentials getContainerCredentials() throws IOException { credentials.readTokenStorageStream(buf); return credentials; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } public static class MockRMWithAMS extends MockRMWithCustomAMLauncher { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..90fe1bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -118,6 +120,13 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { return null; } + + @Override + public ChangeContainersResourceResponse changeContainersResource( + ChangeContainersResourceRequest request) throws YarnException, + IOException { + return null; + } } @Test