diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 184f1b2..233ba2f 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -46,6 +46,8 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -454,6 +456,13 @@ public GetContainerStatusesResponse getContainerStatuses( } @Override + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) throws YarnException, + IOException { + return null; + } + + @Override public void close() throws IOException { } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 41ee65d..4e88daa 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -30,6 +30,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -449,5 +451,14 @@ public StopContainersResponse stopContainers(StopContainersRequest request) "Dummy function cause")); throw new IOException(e); } + + @Override + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) throws IOException, + IOException { + Exception e = new Exception("Dummy function", new Exception( + "Dummy function cause")); + throw new IOException(e); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 7aa43df..01f2f52 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -38,9 +40,9 @@ /** *

The protocol between an ApplicationMaster and a - * NodeManager to start/stop containers and to get status - * of running containers.

- * + * NodeManager to start/stop and increase resource of containers + * and to get status of running containers.

+ * *

If security is enabled the NodeManager verifies that the * ApplicationMaster has truly been allocated the container * by the ResourceManager and also verifies all interactions such @@ -170,4 +172,24 @@ StopContainersResponse stopContainers(StopContainersRequest request) GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException, IOException; + + /** + *

+ * The API used by the ApplicationMaster to request for + * resource increase of running containers on the NodeManager. + *

+ * + * @param request + * request to increase resource of a list of containers + * @return response which includes a list of containerIds of containers + * whose resource has been successfully increased and a + * containerId-to-exception map for failed requests. + * + * @throws YarnException + * @throws IOException + */ + @Public + IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) throws YarnException, + IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/IncreaseContainersResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/IncreaseContainersResourceRequest.java new file mode 100644 index 0000000..41c8a01 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/IncreaseContainersResourceRequest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.util.Records; + +/** + *

The request sent by Application Master to the + * Node Manager to change the resource quota of a container.

+ * + *

For container resource increase, the ApplicationMaster has + * to provide a security token. For container resource decrease, the + * ApplicationMaster has to provide the container ID and the + * target resource capability.

+ * + * @see ContainerManagementProtocol + * #increaseContainersResource(IncreaseContainersResourceRequest) + */ +@Public +@Stable +public abstract class IncreaseContainersResourceRequest { + @Public + @Stable + public static IncreaseContainersResourceRequest newInstance( + List containersToIncrease) { + IncreaseContainersResourceRequest request = + Records.newRecord(IncreaseContainersResourceRequest.class); + request.setContainersToIncrease(containersToIncrease); + return request; + } + + /** + * Get a list of container tokens to be used for authorization during + * container resource increase. + *

+ * Note: {@link NMToken} will be used for authenticating communication with + * {@code NodeManager}. + * @return the list of container tokens to be used for authorization during + * container resource increase. + * @see NMToken + * + */ + @Public + @Stable + public abstract List getContainersToIncrease(); + + /** + * Set container tokens to be used during container resource increase. + * The token is acquired from + * AllocateResponse.getIncreasedContainers. + * The token contains the container id and resource capability required for + * container resource increase. + * @param containersToIncrease the list of container tokens to be used + * for container resource increase. + */ + @Public + @Stable + public abstract void setContainersToIncrease( + List containersToIncrease); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/IncreaseContainersResourceResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/IncreaseContainersResourceResponse.java new file mode 100644 index 0000000..824f2f7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/IncreaseContainersResourceResponse.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; +import java.util.Map; + +/** + *

+ * The response sent by the NodeManager to the + * ApplicationMaster when asked to change container resource. + *

+ * + * @see ContainerManagementProtocol# + * increaseContainersResource(IncreaseContainersResourceRequest) + */ + +@Public +@Stable +public abstract class IncreaseContainersResourceResponse { + @Public + @Stable + public static IncreaseContainersResourceResponse newInstance( + List successfullyIncreasedContainers, + Map failedRequests) { + IncreaseContainersResourceResponse response = + Records.newRecord(IncreaseContainersResourceResponse.class); + response.setSuccessfullyIncreasedContainers( + successfullyIncreasedContainers); + response.setFailedRequests(failedRequests); + return response; + } + + /** + * Get the list of containerIds of containers whose resource + * have been successfully changed. + * + * @return the list of containerIds of containers whose resource have + * been successfully. + */ + @Public + @Stable + public abstract List getSuccessfullyIncreasedContainers(); + + /** + * Set the list of containerIds of containers whose resource have + * been successfully changed. + */ + @Private + @Stable + public abstract void setSuccessfullyIncreasedContainers( + List succeedIncreasedContainers); + + /** + * Get the containerId-to-exception map in which the exception indicates + * error from each container for failed requests. + */ + @Public + @Stable + public abstract Map getFailedRequests(); + + /** + * Set the containerId-to-exception map in which the exception indicates + * error from each container for failed requests. + */ + @Private + @Stable + public abstract void setFailedRequests( + Map failedRequests); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java index 5ccf6dc..b5e589c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java @@ -114,4 +114,12 @@ public static ContainerStatus newInstance(ContainerId containerId, @Private @Unstable public abstract void setDiagnostics(String diagnostics); + + @Public + @Stable + public abstract Resource getCapability(); + + @Private + @Unstable + public abstract void setCapability(Resource capability); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 7b1647b..f06f6cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -34,4 +34,5 @@ service ContainerManagementProtocolService { rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); + rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 15397e3..44037ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -30,12 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -48,6 +52,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; import com.google.protobuf.ServiceException; @@ -128,4 +133,19 @@ public GetContainerStatusesResponse getContainerStatuses( return null; } } + + @Override + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) throws YarnException, + IOException { + IncreaseContainersResourceRequestProto requestProto = + ((IncreaseContainersResourceRequestPBImpl)request).getProto(); + try { + return new IncreaseContainersResourceResponsePBImpl( + proxy.increaseContainersResource(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 2d33e69..0f27f14 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -23,9 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; @@ -94,4 +99,21 @@ public GetContainerStatusesResponseProto getContainerStatuses( throw new ServiceException(e); } } + + @Override + public IncreaseContainersResourceResponseProto increaseContainersResource( + RpcController controller, IncreaseContainersResourceRequestProto proto) + throws ServiceException { + IncreaseContainersResourceRequestPBImpl request = + new IncreaseContainersResourceRequestPBImpl(proto); + try { + IncreaseContainersResourceResponse response = + real.increaseContainersResource(request); + return ((IncreaseContainersResourceResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/IncreaseContainersResourceRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/IncreaseContainersResourceRequestPBImpl.java new file mode 100644 index 0000000..5f13b9a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/IncreaseContainersResourceRequestPBImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class IncreaseContainersResourceRequestPBImpl extends + IncreaseContainersResourceRequest { + IncreaseContainersResourceRequestProto proto = + IncreaseContainersResourceRequestProto.getDefaultInstance(); + IncreaseContainersResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private List containersToIncrease = null; + + public IncreaseContainersResourceRequestPBImpl() { + builder = IncreaseContainersResourceRequestProto.newBuilder(); + } + + public IncreaseContainersResourceRequestPBImpl( + IncreaseContainersResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public IncreaseContainersResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.containersToIncrease != null) { + addIncreaseContainersToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = IncreaseContainersResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getContainersToIncrease() { + initContainersToIncrease(); + return this.containersToIncrease; + } + + @Override + public void setContainersToIncrease(List containersToIncrease) { + if (containersToIncrease == null) { + return; + } + initContainersToIncrease(); + this.containersToIncrease.clear(); + this.containersToIncrease.addAll(containersToIncrease); + } + + private void initContainersToIncrease() { + if (this.containersToIncrease != null) { + return; + } + IncreaseContainersResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getIncreaseContainersList(); + this.containersToIncrease = new ArrayList(); + + for (TokenProto c : list) { + this.containersToIncrease.add(convertFromProtoFormat(c)); + } + } + + private void addIncreaseContainersToProto() { + maybeInitBuilder(); + builder.clearIncreaseContainers(); + if (this.containersToIncrease == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToIncrease.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public TokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllIncreaseContainers(iterable); + } + + private Token convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/IncreaseContainersResourceResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/IncreaseContainersResourceResponsePBImpl.java new file mode 100644 index 0000000..138a478 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/IncreaseContainersResourceResponsePBImpl.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerExceptionMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class IncreaseContainersResourceResponsePBImpl extends + IncreaseContainersResourceResponse { + IncreaseContainersResourceResponseProto proto = + IncreaseContainersResourceResponseProto.getDefaultInstance(); + IncreaseContainersResourceResponseProto.Builder builder = null; + boolean viaProto = false; + private List succeededRequests = null; + private Map failedRequests = null; + + public IncreaseContainersResourceResponsePBImpl() { + builder = IncreaseContainersResourceResponseProto.newBuilder(); + } + + public IncreaseContainersResourceResponsePBImpl( + IncreaseContainersResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public IncreaseContainersResourceResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.succeededRequests != null) { + addSucceededRequestsToProto(); + } + if (this.failedRequests != null) { + addFailedRequestsToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = IncreaseContainersResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getSuccessfullyIncreasedContainers() { + initSucceededRequests(); + return this.succeededRequests; + } + + @Override + public void setSuccessfullyIncreasedContainers( + List succeededRequests) { + maybeInitBuilder(); + if (succeededRequests == null) { + builder.clearSucceededRequests(); + } + this.succeededRequests = succeededRequests; + } + + private void initSucceededRequests() { + if (this.succeededRequests != null) { + return; + } + IncreaseContainersResourceResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getSucceededRequestsList(); + this.succeededRequests = new ArrayList(); + for (ContainerIdProto c : list) { + this.succeededRequests.add(convertFromProtoFormat(c)); + } + } + + private void addSucceededRequestsToProto() { + maybeInitBuilder(); + builder.clearSucceededRequests(); + if (this.succeededRequests == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = succeededRequests.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllSucceededRequests(iterable); + } + + @Override + public Map getFailedRequests() { + initFailedRequests(); + return this.failedRequests; + } + + @Override + public void setFailedRequests( + Map failedRequests) { + maybeInitBuilder(); + if (failedRequests == null) { + builder.clearFailedRequests(); + } + this.failedRequests = failedRequests; + } + + private void initFailedRequests() { + if (this.failedRequests != null) { + return; + } + IncreaseContainersResourceResponseProtoOrBuilder + p = viaProto ? proto : builder; + List protoList = p.getFailedRequestsList(); + this.failedRequests = new HashMap(); + for (ContainerExceptionMapProto ce : protoList) { + this.failedRequests.put(convertFromProtoFormat(ce.getContainerId()), + convertFromProtoFormat(ce.getException())); + } + } + + private void addFailedRequestsToProto() { + maybeInitBuilder(); + builder.clearFailedRequests(); + if (this.failedRequests == null) { + return; + } + List protoList = + new ArrayList(); + + for (Map.Entry entry : this.failedRequests + .entrySet()) { + protoList.add(ContainerExceptionMapProto.newBuilder() + .setContainerId(convertToProtoFormat(entry.getKey())) + .setException(convertToProtoFormat(entry.getValue())).build()); + } + builder.addAllFailedRequests(protoList); + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + private SerializedExceptionPBImpl convertFromProtoFormat( + SerializedExceptionProto p) { + return new SerializedExceptionPBImpl(p); + } + + private SerializedExceptionProto convertToProtoFormat(SerializedException t) { + return ((SerializedExceptionPBImpl) t).getProto(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java index 86f2af9..d33d06d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; @@ -78,6 +80,7 @@ public String toString() { sb.append("ContainerStatus: ["); sb.append("ContainerId: ").append(getContainerId()).append(", "); sb.append("State: ").append(getState()).append(", "); + sb.append("Capability: ").append(getCapability()).append(", "); sb.append("Diagnostics: ").append(getDiagnostics()).append(", "); sb.append("ExitStatus: ").append(getExitStatus()).append(", "); sb.append("]"); @@ -168,6 +171,25 @@ public synchronized void setDiagnostics(String diagnostics) { builder.setDiagnostics(diagnostics); } + @Override + public synchronized Resource getCapability() { + ContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasCapability()) { + return null; + } + return convertFromProtoFormat(p.getCapability()); + } + + @Override + public synchronized void setCapability(Resource capability) { + maybeInitBuilder(); + if (capability == null) { + builder.clearCapability(); + return; + } + builder.setCapability(convertToProtoFormat(capability)); + } + private ContainerStateProto convertToProtoFormat(ContainerState e) { return ProtoUtils.convertToProtoFormat(e); } @@ -184,6 +206,11 @@ private ContainerIdProto convertToProtoFormat(ContainerId t) { return ((ContainerIdPBImpl)t).getProto(); } + private ResourceProto convertToProtoFormat(Resource e) { + return ((ResourcePBImpl)e).getProto(); + } - -} + private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestIncreaseContainersResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestIncreaseContainersResourceRequest.java new file mode 100644 index 0000000..0e5cf14 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestIncreaseContainersResourceRequest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; +import org.junit.Assert; +import org.junit.Test; + +public class TestIncreaseContainersResourceRequest { + + @Test + public void testIncreaseContainersResourceRequest() { + List containerToIncrease = new ArrayList(); + for (int i = 0; i < 10; i++) { + Token ctx = + Token.newInstance("identifier".getBytes(), "simple", + "passwd".getBytes(), "service"); + containerToIncrease.add(ctx); + } + IncreaseContainersResourceRequest request = + IncreaseContainersResourceRequest.newInstance(containerToIncrease + ); + IncreaseContainersResourceRequestProto proto = + ((IncreaseContainersResourceRequestPBImpl) request).getProto(); + IncreaseContainersResourceRequest requestFromProto = + new IncreaseContainersResourceRequestPBImpl(proto); + // verify the whole record equals with original record + Assert.assertEquals(requestFromProto, request); + Assert.assertEquals(requestFromProto.getContainersToIncrease().size(), + containerToIncrease.size()); + } + + @Test + public void testIncreaseContainersResourceRequestWithNull() { + IncreaseContainersResourceRequest request = + IncreaseContainersResourceRequest.newInstance(null); + IncreaseContainersResourceRequestProto proto = + ((IncreaseContainersResourceRequestPBImpl) request).getProto(); + request = new IncreaseContainersResourceRequestPBImpl(proto); + // check value + Assert.assertEquals(0, request.getContainersToIncrease().size()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index e2071dd..45a13ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -31,6 +31,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -166,5 +168,11 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesResponse.newInstance(list, null); return null; } + + @Override + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) throws YarnException, IOException { + return null; + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 39e6162..2eb0eb5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -219,6 +221,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request) new Exception(EXCEPTION_CAUSE)); throw new YarnException(e); } + + @Override + public IncreaseContainersResourceResponse increaseContainersResource( + IncreaseContainersResourceRequest request) throws YarnException, IOException { + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier(