diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
index dd1060c..0736b10 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
@@ -54,6 +54,8 @@
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -71,6 +73,7 @@
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
@@ -438,5 +441,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
"Dummy function cause"));
throw new IOException(e);
}
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
index 6f21c87..1f21568 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
@@ -45,6 +45,8 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -443,6 +445,12 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws IOException {
return null;
}
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
@SuppressWarnings("serial")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
index 7aa43df..8d84789 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
@@ -22,6 +22,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -170,4 +172,16 @@ StopContainersResponse stopContainers(StopContainersRequest request)
GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws YarnException,
IOException;
+
+ /**
+ * The API used by the ApplicationMaster to request change
+ * running containers in this node
+ *
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java
new file mode 100644
index 0000000..ab37f80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceRequest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ * The request sent by Application Master to ask
+ * Node Manager change resource quota used by a specified container
+ *
+ *
+ * @see ContainerManagementProtocol#changeContainersResource(ChangeContainersResourceRequest)
+ */
+@Public
+@Stable
+public abstract class ChangeContainersResourceRequest {
+ @Public
+ @Stable
+ public static ChangeContainersResourceRequest newInstance(
+ List containersTokensToIncrease,
+ List containersToDecrease) {
+ ChangeContainersResourceRequest request =
+ Records.newRecord(ChangeContainersResourceRequest.class);
+ request.setContainersToIncrease(containersTokensToIncrease);
+ request.setContainersToDecrease(containersToDecrease);
+ return request;
+ }
+
+ /**
+ * get tokens of container need to be increased.
+ */
+ @Public
+ @Stable
+ public abstract List getContainersToIncrease();
+
+ /**
+ * set tokens of containers need to be increased, token is acquired in
+ * AllocateResponse.getIncreasedContainers. Here we only need
+ * the token in ContainerResourceIncrease because container id
+ * and capability are already contained in token
+ */
+ @Public
+ @Stable
+ public abstract void setContainersToIncrease(
+ List containersTokensToIncrease);
+
+ /**
+ * get decrease containers sent by ApplicationMaster
+ */
+ @Public
+ @Stable
+ public abstract List getContainersToDecrease();
+
+ /**
+ * set containers need to be decreased by ApplicationMaster to
+ * NodeManager
+ */
+ @Public
+ @Stable
+ public abstract void setContainersToDecrease(
+ List containersToDecrease);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java
new file mode 100644
index 0000000..9bd7708
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ChangeContainersResourceResponse.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+
+@Public
+@Stable
+public abstract class ChangeContainersResourceResponse {
+ @Public
+ @Stable
+ public static ChangeContainersResourceResponse newInstance(
+ List succeedChangedContainers,
+ List failedChangedContainers) {
+ ChangeContainersResourceResponse request =
+ Records.newRecord(ChangeContainersResourceResponse.class);
+ request.setSucceedChangedContainers(succeedChangedContainers);
+ request.setFailedChangedContainers(failedChangedContainers);
+ return request;
+ }
+
+ @Public
+ @Stable
+ public abstract List getSucceedChangedContainers();
+
+ @Public
+ @Stable
+ public abstract void setSucceedChangedContainers(
+ List succeedIncreasedContainers);
+
+ @Public
+ @Stable
+ public abstract List getFailedChangedContainers();
+
+ @Public
+ @Stable
+ public abstract void setFailedChangedContainers(
+ List succeedIncreasedContainers);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
index 7b1647b..dd7874a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
@@ -34,4 +34,5 @@ service ContainerManagementProtocolService {
rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto);
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
+ rpc changeContainersResource(ChangeContainersResourceRequestProto) returns (ChangeContainersResourceResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 332be81..e8f902d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -229,3 +229,13 @@ message GetContainerStatusesResponseProto {
repeated ContainerStatusProto status = 1;
repeated ContainerExceptionMapProto failed_requests = 2;
}
+
+message ChangeContainersResourceRequestProto {
+ repeated hadoop.common.TokenProto increase_containers = 1;
+ repeated ContainerResourceDecreaseProto decrease_containers = 2;
+}
+
+message ChangeContainersResourceResponseProto {
+ repeated ContainerIdProto succeed_changed_containers = 1;
+ repeated ContainerIdProto failed_changed_containers = 2;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
index 15397e3..63e3612 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
@@ -30,12 +30,16 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@@ -45,6 +49,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
@@ -128,4 +133,17 @@ public GetContainerStatusesResponse getContainerStatuses(
return null;
}
}
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ ChangeContainersResourceRequestProto requestProto = ((ChangeContainersResourceRequestPBImpl)request).getProto();
+ try {
+ return new ChangeContainersResourceResponsePBImpl(proxy.changeContainersResource(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
index 2d33e69..55a9d33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
@@ -23,9 +23,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@@ -33,6 +36,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
@@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses(
throw new ServiceException(e);
}
}
+
+ @Override
+ public ChangeContainersResourceResponseProto changeContainersResource(
+ RpcController controller, ChangeContainersResourceRequestProto proto)
+ throws ServiceException {
+ ChangeContainersResourceRequestPBImpl request = new ChangeContainersResourceRequestPBImpl(proto);
+ try {
+ ChangeContainersResourceResponse response = real.changeContainersResource(request);
+ return ((ChangeContainersResourceResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java
new file mode 100644
index 0000000..45b5f89
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceRequestPBImpl.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class ChangeContainersResourceRequestPBImpl extends
+ ChangeContainersResourceRequest {
+ ChangeContainersResourceRequestProto proto =
+ ChangeContainersResourceRequestProto.getDefaultInstance();
+ ChangeContainersResourceRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List containersToIncrease = null;
+ private List containersToDecrease = null;
+
+ public ChangeContainersResourceRequestPBImpl() {
+ builder = ChangeContainersResourceRequestProto.newBuilder();
+ }
+
+ public ChangeContainersResourceRequestPBImpl(
+ ChangeContainersResourceRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ChangeContainersResourceRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containersToIncrease != null) {
+ addIncreaseContainersToProto();
+ }
+ if (this.containersToDecrease != null) {
+ addDecreaseContainersToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ChangeContainersResourceRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public List getContainersToIncrease() {
+ initContainersToIncrease();
+ return this.containersToIncrease;
+ }
+
+ @Override
+ public void setContainersToIncrease(List containersToIncrease) {
+ if (containersToIncrease == null) {
+ return;
+ }
+ initContainersToIncrease();
+ this.containersToIncrease.clear();
+ this.containersToIncrease.addAll(containersToIncrease);
+ }
+
+ @Override
+ public List getContainersToDecrease() {
+ initContainersToDecrease();
+ return this.containersToDecrease;
+ }
+
+ @Override
+ public void setContainersToDecrease(
+ List containersToDecrease) {
+ if (containersToDecrease == null) {
+ return;
+ }
+ initContainersToDecrease();
+ this.containersToDecrease.clear();
+ this.containersToDecrease.addAll(containersToDecrease);
+ }
+
+ private void initContainersToIncrease() {
+ if (this.containersToIncrease != null) {
+ return;
+ }
+ ChangeContainersResourceRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List list = p.getIncreaseContainersList();
+ this.containersToIncrease = new ArrayList();
+
+ for (TokenProto c : list) {
+ this.containersToIncrease.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addIncreaseContainersToProto() {
+ maybeInitBuilder();
+ builder.clearIncreaseContainers();
+ if (this.containersToIncrease == null) {
+ return;
+ }
+ Iterable iterable = new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = containersToIncrease.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public TokenProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllIncreaseContainers(iterable);
+ }
+
+ private void initContainersToDecrease() {
+ if (this.containersToDecrease != null) {
+ return;
+ }
+ ChangeContainersResourceRequestProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List list = p.getDecreaseContainersList();
+ this.containersToDecrease = new ArrayList();
+
+ for (ContainerResourceDecreaseProto c : list) {
+ this.containersToDecrease.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addDecreaseContainersToProto() {
+ maybeInitBuilder();
+ builder.clearDecreaseContainers();
+ if (this.containersToDecrease == null) {
+ return;
+ }
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = containersToDecrease
+ .iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerResourceDecreaseProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllDecreaseContainers(iterable);
+ }
+
+ private Token convertFromProtoFormat(TokenProto p) {
+ return new TokenPBImpl(p);
+ }
+
+ private TokenProto convertToProtoFormat(Token t) {
+ return ((TokenPBImpl) t).getProto();
+ }
+
+ private ContainerResourceDecrease convertFromProtoFormat(
+ ContainerResourceDecreaseProto p) {
+ return new ContainerResourceDecreasePBImpl(p);
+ }
+
+ private ContainerResourceDecreaseProto convertToProtoFormat(
+ ContainerResourceDecrease t) {
+ return ((ContainerResourceDecreasePBImpl) t).getProto();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java
new file mode 100644
index 0000000..12f89c5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ChangeContainersResourceResponsePBImpl.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class ChangeContainersResourceResponsePBImpl extends
+ ChangeContainersResourceResponse {
+ ChangeContainersResourceResponseProto proto =
+ ChangeContainersResourceResponseProto.getDefaultInstance();
+ ChangeContainersResourceResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List succeedChangedContainers = null;
+ private List failedChangedContainers = null;
+
+ public ChangeContainersResourceResponsePBImpl() {
+ builder = ChangeContainersResourceResponseProto.newBuilder();
+ }
+
+ public ChangeContainersResourceResponsePBImpl(
+ ChangeContainersResourceResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ChangeContainersResourceResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.succeedChangedContainers != null) {
+ addSucceedChangedContainersToProto();
+ }
+ if (this.failedChangedContainers != null) {
+ addFailedChangedContainersToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ChangeContainersResourceResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public List getSucceedChangedContainers() {
+ initSucceedChangedContainers();
+ return this.succeedChangedContainers;
+ }
+
+ @Override
+ public void setSucceedChangedContainers(List containers) {
+ if (containers == null) {
+ return;
+ }
+ initSucceedChangedContainers();
+ this.succeedChangedContainers.clear();
+ this.succeedChangedContainers.addAll(containers);
+ }
+
+ private void initSucceedChangedContainers() {
+ if (this.succeedChangedContainers != null) {
+ return;
+ }
+ ChangeContainersResourceResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List list = p.getSucceedChangedContainersList();
+ this.succeedChangedContainers = new ArrayList();
+
+ for (ContainerIdProto c : list) {
+ this.succeedChangedContainers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addSucceedChangedContainersToProto() {
+ maybeInitBuilder();
+ builder.clearSucceedChangedContainers();
+ if (this.succeedChangedContainers == null) {
+ return;
+ }
+ Iterable iterable = new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = succeedChangedContainers.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllSucceedChangedContainers(iterable);
+ }
+
+ @Override
+ public List getFailedChangedContainers() {
+ initFailedChangedContainers();
+ return this.failedChangedContainers;
+ }
+
+ @Override
+ public void setFailedChangedContainers(List containers) {
+ if (containers == null) {
+ return;
+ }
+ initFailedChangedContainers();
+ this.failedChangedContainers.clear();
+ this.failedChangedContainers.addAll(containers);
+ }
+
+ private void initFailedChangedContainers() {
+ if (this.failedChangedContainers != null) {
+ return;
+ }
+ ChangeContainersResourceResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List list = p.getFailedChangedContainersList();
+ this.failedChangedContainers = new ArrayList();
+
+ for (ContainerIdProto c : list) {
+ this.failedChangedContainers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addFailedChangedContainersToProto() {
+ maybeInitBuilder();
+ builder.clearFailedChangedContainers();
+ if (this.failedChangedContainers == null) {
+ return;
+ }
+ Iterable iterable = new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = failedChangedContainers.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllFailedChangedContainers(iterable);
+ }
+
+ private ContainerId convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl) t).getProto();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index 8fe5c3c..6771d88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -33,6 +33,8 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -165,5 +167,12 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesResponse.newInstance(list, null);
return null;
}
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 76384d3..8ae6a9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -35,6 +35,8 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@@ -218,6 +220,13 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
new Exception(EXCEPTION_CAUSE));
throw new YarnException(e);
}
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java
new file mode 100644
index 0000000..61102ce
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestResourceChangeRPC.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.junit.Test;
+
+public class TestResourceChangeRPC {
+
+ static final Log LOG = LogFactory.getLog(TestContainerLaunchRPC.class);
+
+ @Test
+ public void testResourceChangeRPC() throws Exception {
+ testResourceChange(HadoopYarnProtoRPC.class.getName());
+ }
+
+ private void testResourceChange(String rpcClass) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
+ YarnRPC rpc = YarnRPC.create(conf);
+ String bindAddr = "localhost:0";
+ InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+ Server server =
+ rpc.getServer(ContainerManagementProtocol.class,
+ new DummyContainerManager(), addr, conf, null, 1);
+ server.start();
+ try {
+
+ ContainerManagementProtocol proxy =
+ (ContainerManagementProtocol) rpc.getProxy(
+ ContainerManagementProtocol.class, server.getListenerAddress(),
+ conf);
+
+ List inc = new ArrayList();
+ List dec =
+ new ArrayList();
+
+ for (int i = 0; i < 3; i++) {
+ inc.add(Token.newInstance("id".getBytes(), "pb", "passwd".getBytes(),
+ "service"));
+ }
+ for (int i = 0; i < 5; i++) {
+ dec.add(ContainerResourceDecrease.newInstance(null, null));
+ }
+
+ ChangeContainersResourceRequest req =
+ ChangeContainersResourceRequest.newInstance(inc, dec);
+
+ try {
+ ChangeContainersResourceResponse res =
+ proxy.changeContainersResource(req);
+ Assert.assertEquals(inc.size(), res.getSucceedChangedContainers()
+ .size());
+ Assert
+ .assertEquals(dec.size(), res.getFailedChangedContainers().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+ public class DummyContainerManager implements ContainerManagementProtocol {
+
+ @Override
+ public StartContainersResponse startContainers(
+ StartContainersRequest requests) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public StopContainersResponse
+ stopContainers(StopContainersRequest requests) throws YarnException,
+ IOException {
+ return null;
+ }
+
+ @Override
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ ContainerId cid = ContainerId.newInstance(null, 0);
+ List inc = new ArrayList();
+ List dec = new ArrayList();
+ for (int i = 0; i < request.getContainersToIncrease().size(); i++) {
+ inc.add(cid);
+ }
+ for (int i = 0; i < request.getContainersToDecrease().size(); i++) {
+ dec.add(cid);
+ }
+
+ ChangeContainersResourceResponse res =
+ ChangeContainersResourceResponse.newInstance(inc, dec);
+ return res;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java
new file mode 100644
index 0000000..3087c14
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceRequest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceRequestProto;
+import org.junit.Test;
+
+public class TestChangeContainersResourceRequest {
+ int id = 0;
+
+ private ContainerResourceDecrease getNextResourceChangeContext() {
+ ContainerId containerId =
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(1234, 3), 3), id++);
+ Resource resource = Resource.newInstance(1023, 3);
+ ContainerResourceDecrease rcContext =
+ ContainerResourceDecrease.newInstance(containerId, resource);
+ return rcContext;
+ }
+
+ @Test
+ public void testChangeContainersResourceRequest() {
+ List containerToIncrease = new ArrayList();
+ List containerToDecrease =
+ new ArrayList();
+
+ for (int i = 0; i < 10; i++) {
+ Token ctx =
+ Token.newInstance("identifier".getBytes(), "simple",
+ "passwd".getBytes(), "service");
+ containerToIncrease.add(ctx);
+ }
+ for (int i = 0; i < 5; i++) {
+ ContainerResourceDecrease ctx = getNextResourceChangeContext();
+ containerToDecrease.add(ctx);
+ }
+
+ ChangeContainersResourceRequest request =
+ ChangeContainersResourceRequest.newInstance(containerToIncrease,
+ containerToDecrease);
+
+ // serde
+ ChangeContainersResourceRequestProto proto =
+ ((ChangeContainersResourceRequestPBImpl) request).getProto();
+ request = new ChangeContainersResourceRequestPBImpl(proto);
+
+ // check value
+ Assert.assertEquals(request.getContainersToIncrease().size(),
+ containerToIncrease.size());
+ Assert.assertEquals(request.getContainersToDecrease().size(),
+ containerToDecrease.size());
+ for (int i = 0; i < containerToDecrease.size(); i++) {
+ Assert.assertEquals(request.getContainersToDecrease().get(i),
+ containerToDecrease.get(i));
+ }
+ }
+
+ @Test
+ public void testChangeContainersResourceRequestWithNull() {
+ ChangeContainersResourceRequest request =
+ ChangeContainersResourceRequest.newInstance(null, null);
+
+ // serde
+ ChangeContainersResourceRequestProto proto =
+ ((ChangeContainersResourceRequestPBImpl) request).getProto();
+ request = new ChangeContainersResourceRequestPBImpl(proto);
+
+ // check value
+ Assert.assertEquals(0, request.getContainersToIncrease().size());
+ Assert.assertEquals(0, request.getContainersToDecrease().size());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java
new file mode 100644
index 0000000..2af49c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestChangeContainersResourceResponse.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ChangeContainersResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ChangeContainersResourceResponseProto;
+import org.junit.Test;
+
+public class TestChangeContainersResourceResponse {
+ int id = 0;
+
+ private ContainerId getNextContainerId() {
+ ContainerId containerId =
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(1234, 3), 3), id++);
+ return containerId;
+ }
+
+ @Test
+ public void testChangeContainersResourceRequest() {
+ List succeedChanged = new ArrayList();
+ List failChanged = new ArrayList();
+
+ for (int i = 0; i < 10; i++) {
+ succeedChanged.add(getNextContainerId());
+ }
+
+ for (int i = 0; i < 8; i++) {
+ failChanged.add(getNextContainerId());
+ }
+
+ ChangeContainersResourceResponse response =
+ ChangeContainersResourceResponse.newInstance(succeedChanged,
+ failChanged);
+
+ // serde
+ ChangeContainersResourceResponseProto proto =
+ ((ChangeContainersResourceResponsePBImpl) response).getProto();
+ response = new ChangeContainersResourceResponsePBImpl(proto);
+
+ // check value
+ Assert.assertEquals(response.getSucceedChangedContainers().size(),
+ succeedChanged.size());
+ Assert.assertEquals(response.getFailedChangedContainers().size(),
+ failChanged.size());
+ for (int i = 0; i < succeedChanged.size(); i++) {
+ Assert.assertEquals(response.getSucceedChangedContainers().get(i),
+ succeedChanged.get(i));
+ }
+ for (int i = 0; i < failChanged.size(); i++) {
+ Assert.assertEquals(response.getFailedChangedContainers().get(i),
+ failChanged.get(i));
+ }
+ }
+
+ @Test
+ public void testChangeContainersResourceRequestWithNull() {
+ ChangeContainersResourceResponse request =
+ ChangeContainersResourceResponse.newInstance(null, null);
+
+ // serde
+ ChangeContainersResourceResponseProto proto =
+ ((ChangeContainersResourceResponsePBImpl) request).getProto();
+ request = new ChangeContainersResourceResponsePBImpl(proto);
+
+ // check value
+ Assert.assertEquals(0, request.getSucceedChangedContainers().size());
+ Assert.assertEquals(0, request.getFailedChangedContainers().size());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index aad819d..bde5c14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -20,23 +20,25 @@
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
-
public abstract class NodeStatus {
public static NodeStatus newInstance(NodeId nodeId, int responseId,
List containerStatuses,
List keepAliveApplications,
- NodeHealthStatus nodeHealthStatus) {
+ NodeHealthStatus nodeHealthStatus,
+ List decreasedContainers) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
nodeStatus.setContainersStatuses(containerStatuses);
nodeStatus.setKeepAliveApplications(keepAliveApplications);
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
+ nodeStatus.setDecreasedContainers(decreasedContainers);
return nodeStatus;
}
@@ -55,4 +57,9 @@ public abstract void setContainersStatuses(
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);
+
+ public abstract void setDecreasedContainers(
+ List decreasedContainers);
+
+ public abstract List getDecreasedContainers();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 65376dc..c10ce74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -24,12 +24,15 @@
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
@@ -48,6 +51,7 @@
private List containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List keepAliveApplications = null;
+ private List newDecreasedContainers = null;
public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
@@ -78,6 +82,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto();
}
+ if (this.newDecreasedContainers != null) {
+ addNewDecreasedContainersToProto();
+ }
}
private synchronized void mergeLocalToProto() {
@@ -163,6 +170,43 @@ public void remove() {
};
builder.addAllKeepAliveApplications(iterable);
}
+
+ private synchronized void addNewDecreasedContainersToProto() {
+ maybeInitBuilder();
+ builder.clearNewDecreasedContainers();
+ if (newDecreasedContainers == null)
+ return;
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = newDecreasedContainers
+ .iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerResourceDecreaseProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllNewDecreasedContainers(iterable);
+ }
+
@Override
public int hashCode() {
@@ -291,6 +335,36 @@ public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) {
this.nodeHealthStatus = healthStatus;
}
+ @Override
+ public synchronized void setDecreasedContainers(
+ List decreasedContainers) {
+ if (decreasedContainers == null) {
+ builder.clearNewDecreasedContainers();
+ }
+ this.newDecreasedContainers = decreasedContainers;
+ }
+
+ @Override
+ public synchronized List getDecreasedContainers() {
+ initDecreasedContainers();
+ return this.newDecreasedContainers;
+ }
+
+ private synchronized void initDecreasedContainers() {
+ if (this.newDecreasedContainers != null) {
+ return;
+ }
+ NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+ List list =
+ p.getNewDecreasedContainersList();
+ this.newDecreasedContainers = new ArrayList();
+
+ for (ContainerResourceDecreaseProto c : list) {
+ this.newDecreasedContainers.add(convertFromProtoFormat(c));
+ }
+
+ }
+
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
@@ -316,6 +390,16 @@ private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
return ((ContainerStatusPBImpl)c).getProto();
}
+ private ContainerResourceDecreaseProto convertToProtoFormat(
+ ContainerResourceDecrease t) {
+ return ((ContainerResourceDecreasePBImpl) t).getProto();
+ }
+
+ private ContainerResourceDecrease convertFromProtoFormat(
+ ContainerResourceDecreaseProto p) {
+ return new ContainerResourceDecreasePBImpl(p);
+ }
+
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
return new ApplicationIdPBImpl(c);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 4f5d168..ed93639 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -36,6 +36,7 @@ message NodeStatusProto {
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
repeated ApplicationIdProto keep_alive_applications = 5;
+ repeated ContainerResourceDecreaseProto new_decreased_containers = 6;
}
message MasterKeyProto {
@@ -47,4 +48,5 @@ message NodeHealthStatusProto {
optional bool is_node_healthy = 1;
optional string health_report = 2;
optional int64 last_health_report_time = 3;
-}
\ No newline at end of file
+}
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java
new file mode 100644
index 0000000..06202f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestNodeStatus.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
+import org.junit.Test;
+
+public class TestNodeStatus {
+ @Test
+ public void testNodeStatusWithDecreasedContainers() {
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
+
+ // add 3 instances
+ List list =
+ new ArrayList();
+ list.add(ContainerResourceDecrease.newInstance(null, null));
+ list.add(ContainerResourceDecrease.newInstance(null, null));
+ list.add(ContainerResourceDecrease.newInstance(null, null));
+ nodeStatus.setDecreasedContainers(list);
+
+ // serde
+ NodeStatusProto proto = ((NodeStatusPBImpl) nodeStatus).getProto();
+ nodeStatus = new NodeStatusPBImpl(proto);
+
+ // check value
+ Assert.assertEquals(list.size(), nodeStatus.getDecreasedContainers()
+ .size());
+ }
+
+ @Test
+ public void testNodeStatusWithoutDecreasedContainers() {
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
+
+ // serde
+ NodeStatusProto proto = ((NodeStatusPBImpl) nodeStatus).getProto();
+ nodeStatus = new NodeStatusPBImpl(proto);
+
+ // check value
+ Assert.assertEquals(0, nodeStatus.getDecreasedContainers().size());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 729e043..b690d53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -66,4 +68,6 @@
LocalDirsHandlerService getLocalDirsHandler();
ApplicationACLsManager getApplicationACLsManager();
+
+ BlockingQueue getDecreasedContainers();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a169c12..7444a2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -40,6 +42,7 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -242,6 +245,8 @@ public void run() {
new ConcurrentHashMap();
private final ConcurrentMap containers =
new ConcurrentSkipListMap();
+ private final BlockingQueue decreasedContainers =
+ new LinkedBlockingQueue();
private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager;
@@ -328,6 +333,11 @@ public LocalDirsHandlerService getLocalDirsHandler() {
public ApplicationACLsManager getApplicationACLsManager() {
return aclsManager;
}
+
+ @Override
+ public BlockingQueue getDecreasedContainers() {
+ return decreasedContainers;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index aaf6ceb..e392b94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -338,10 +340,25 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext(
LOG.debug(this.nodeId + " sending out status for "
+ containersStatuses.size() + " containers");
NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
- containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
+ containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus,
+ pullDecreasedContainers());
return nodeStatus;
}
+
+ private List pullDecreasedContainers() {
+ List decreasedContainers =
+ new ArrayList();
+ ContainerResourceDecrease item = null;
+ try {
+ while (null != (item = context.getDecreasedContainers().remove())) {
+ decreasedContainers.add(item);
+ }
+ } catch (NoSuchElementException e) {
+ // do nothing
+ }
+ return decreasedContainers;
+ }
/*
* It will return current container statuses. If any container has
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index dd3deb3..52f7fb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -55,6 +55,8 @@
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -65,9 +67,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -112,6 +116,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerChangeMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -907,4 +912,125 @@ public Context getContext() {
public Map getAuxServiceMetaData() {
return this.auxiliaryServices.getMetaData();
}
+
+ @Private
+ @VisibleForTesting
+ protected void authorizeIncreaseRequest(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ boolean unauthorized = false;
+ StringBuilder messageBuilder =
+ new StringBuilder("Unauthorized request to change container. ");
+ if (!nmTokenIdentifier.getApplicationAttemptId().equals(
+ containerId.getApplicationAttemptId())) {
+ unauthorized = true;
+ messageBuilder.append("\nNMToken for application attempt : ")
+ .append(nmTokenIdentifier.getApplicationAttemptId())
+ .append(" was used for starting container with container token")
+ .append(" issued for application attempt : ")
+ .append(containerId.getApplicationAttemptId());
+ }
+ if (unauthorized) {
+ String msg = messageBuilder.toString();
+ LOG.error(msg);
+ throw RPCUtil.getRemoteException(msg);
+ }
+ }
+
+ private void internalChangeContainerResource(ContainerId containerId,
+ Resource resource, List succeedChangedContainers,
+ List failedChangedContainers,
+ ContainerResourceDecrease decrease) throws YarnException {
+ // check container's existence
+ if (context.getContainers().get(containerId) == null) {
+ LOG.error("failed to increase size of container because"
+ + " this container not existed in this node:"
+ + containerId.toString());
+ failedChangedContainers.add(containerId);
+ return;
+ }
+
+ // check container's state
+ org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState currentState =
+ context.getContainers().get(containerId).getContainerState();
+ if (currentState != org.apache.hadoop.yarn.server.
+ nodemanager.containermanager.container.ContainerState.RUNNING) {
+ LOG.warn("We can only increase size of container in"
+ + " RUNNING state, containerId=" + containerId.toString()
+ + " current state is:" + currentState.name());
+ failedChangedContainers.add(containerId);
+ return;
+ }
+
+ // create ChangeContainerMonitoringEvent
+ long pmemBytes = resource.getMemory() * 1024 * 1024L;
+ float pmemRatio =
+ getConfig().getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ long vmemBytes = (long) (pmemRatio * pmemBytes);
+ ContainerChangeMonitoringEvent monitorEvent =
+ new ContainerChangeMonitoringEvent(containerId, vmemBytes, pmemBytes);
+ containersMonitor.handle(monitorEvent);
+
+ // We can consider the change successful
+ succeedChangedContainers.add(containerId);
+
+ // if it's a succeed decreased container, add it to context, this will be
+ // used by NodeStatusUpdater
+ if (null != decrease) {
+ if (!context.getDecreasedContainers().offer(decrease)) {
+ LOG.error("Failed to decreased container"
+ + " to queue in NMContext, this shouldn't happen.");
+ throw new YarnException("Failed to decreased container"
+ + " to queue in NMContext, this shouldn't happen.");
+ }
+ }
+ }
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ // get NMToken identifier
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier nmIdentifier = selectNMTokenIdentifier(remoteUgi);
+
+ List succeedChangedContainers = new ArrayList();
+ List failedChangedContainers = new ArrayList();
+
+ // loop all decrease request, try to change their resource limits
+ for (ContainerResourceDecrease r : request.getContainersToDecrease()) {
+ if (r.getContainerId() != null && r.getCapability() != null) {
+ internalChangeContainerResource(r.getContainerId(), r.getCapability(),
+ succeedChangedContainers, failedChangedContainers, r);
+ }
+ }
+
+ // loop all increase request to check
+ for (org.apache.hadoop.yarn.api.records.Token token : request
+ .getContainersToIncrease()) {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ BuilderUtils.newContainerTokenIdentifier(token);
+
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ Resource resource = containerTokenIdentifier.getResource();
+ try {
+ authorizeIncreaseRequest(nmIdentifier, containerTokenIdentifier);
+ } catch (YarnException e) {
+ failedChangedContainers.add(containerTokenIdentifier.getContainerID());
+ continue;
+ }
+
+ // change container monitoring size, pass null in last parameter to
+ // indicate it's not a decrease container request
+ internalChangeContainerResource(containerId, resource,
+ succeedChangedContainers, failedChangedContainers, null);
+ }
+
+ ChangeContainersResourceResponse response =
+ ChangeContainersResourceResponse.newInstance(succeedChangedContainers,
+ failedChangedContainers);
+ return response;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java
new file mode 100644
index 0000000..cb7bdb2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerChangeMonitoringEvent extends ContainersMonitorEvent {
+ private final long vmemLimit;
+ private final long pmemLimit;
+
+ public ContainerChangeMonitoringEvent(ContainerId containerId,
+ long vmemLimit, long pmemLimit) {
+ super(containerId, ContainersMonitorEventType.CHANGE_MONITORING_CONTAINER);
+ this.vmemLimit = vmemLimit;
+ this.pmemLimit = pmemLimit;
+ }
+
+ public long getVmemLimit() {
+ return this.vmemLimit;
+ }
+
+ public long getPmemLimit() {
+ return this.pmemLimit;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java
index be99651..9d73745 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java
@@ -20,5 +20,6 @@
public enum ContainersMonitorEventType {
START_MONITORING_CONTAINER,
- STOP_MONITORING_CONTAINER
+ STOP_MONITORING_CONTAINER,
+ CHANGE_MONITORING_CONTAINER
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index b681b34..8ba371a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -55,6 +55,7 @@
final Map containersToBeAdded;
Map trackingContainers =
new HashMap();
+ final List containersToBeChanged;
final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher;
@@ -81,6 +82,7 @@ public ContainersMonitorImpl(ContainerExecutor exec,
this.containersToBeAdded = new HashMap();
this.containersToBeRemoved = new ArrayList();
+ this.containersToBeChanged = new ArrayList();
this.monitoringThread = new MonitoringThread();
}
@@ -195,7 +197,7 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
- private static class ProcessTreeInfo {
+ static class ProcessTreeInfo {
private ContainerId containerId;
private String pid;
private ResourceCalculatorProcessTree pTree;
@@ -306,6 +308,19 @@ boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree,
return isProcessTreeOverLimit(containerId, currentMemUsage,
curMemUsageOfAgedProcesses, limit);
}
+
+ static class ContainerResourceToBeChanged {
+ public ContainerResourceToBeChanged(ContainerId containerId,
+ long vmemLimit, long pmemLimit) {
+ this.containerId = containerId;
+ this.vmemLimit = vmemLimit;
+ this.pmemLimit = pmemLimit;
+ }
+
+ ContainerId containerId;
+ long vmemLimit;
+ long pmemLimit;
+ }
private class MonitoringThread extends Thread {
public MonitoringThread() {
@@ -348,6 +363,25 @@ public void run() {
}
containersToBeRemoved.clear();
}
+
+ // handle containers to be changed
+ synchronized (containersToBeChanged) {
+ for (ContainerResourceToBeChanged c : containersToBeChanged) {
+ // if c is not in trackingContainers, it maybe removed (finished or
+ // killed) already, just skip
+ if (trackingContainers.get(c.containerId) == null) {
+ LOG.warn("note container not found in trackingContainers, "
+ + "it maybe already completed, please check, Container="
+ + c.containerId.toString()
+ + " that was to be resized is no longer running");
+ continue;
+ }
+ ProcessTreeInfo info = trackingContainers.get(c.containerId);
+ info.vmemLimit = c.vmemLimit;
+ info.pmemLimit = c.pmemLimit;
+ }
+ containersToBeChanged.clear();
+ }
// Now do the monitoring for the trackingContainers
// Check memory usage and kill any overflowing containers
@@ -547,6 +581,13 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
this.containersToBeRemoved.add(containerId);
}
break;
+ case CHANGE_MONITORING_CONTAINER:
+ ContainerChangeMonitoringEvent event =
+ (ContainerChangeMonitoringEvent) monitoringEvent;
+ synchronized (this.containersToBeChanged) {
+ this.containersToBeChanged.add(new ContainerResourceToBeChanged(
+ containerId, event.getVmemLimit(), event.getPmemLimit()));
+ }
default:
// TODO: Wrong event.
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index a47e7f7..ebfc603 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -203,4 +203,10 @@ protected ContainerExecutor createContainerExecutor() {
linuxContainerExecutor.setConf(super.conf);
return linuxContainerExecutor;
}
+
+ @Override
+ public void testContainersChangeResource() {
+ LOG.info("not running this test.");
+ return;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index f62cd50..6ba2371 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -40,6 +40,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -791,6 +793,69 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception {
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
.contains("The auxService:" + serviceName + " does not exist"));
}
+
+ @Test
+ public void testContainersChangeResource() throws Exception {
+ containerManager.start();
+
+ // first we will start containers 0..4
+ List list = new ArrayList();
+ ContainerLaunchContext containerLaunchContext = recordFactory
+ .newRecordInstance(ContainerLaunchContext.class);
+ for (int i = 0; i < 4; i++) {
+ ContainerId cId = createContainerId(i);
+ long identifier = DUMMY_RM_IDENTIFIER;
+ Token containerToken = createContainerToken(cId, identifier,
+ context.getNodeId(), user, context.getContainerTokenSecretManager());
+ StartContainerRequest request = StartContainerRequest.newInstance(
+ containerLaunchContext, containerToken);
+ list.add(request);
+ }
+ StartContainersRequest requestList = StartContainersRequest
+ .newInstance(list);
+
+ StartContainersResponse response = containerManager
+ .startContainers(requestList);
+
+ Assert.assertEquals(4, response.getSuccessfullyStartedContainers().size());
+ for (int i = 0; i < response.getSuccessfullyStartedContainers().size(); i++) {
+ Assert.assertEquals(i, response.getSuccessfullyStartedContainers().get(i)
+ .getId());
+ }
+
+ // we will create some change request,
+ List increaseTokens = new ArrayList();
+
+ // add change request for container-0, the resource not match because
+ // container's state in CM will not be RUNNING
+ ContainerId cId = createContainerId(0);
+ long identifier = DUMMY_RM_IDENTIFIER;
+ Token containerToken =
+ createContainerToken(cId, identifier, context.getNodeId(), user,
+ context.getContainerTokenSecretManager(),
+ Resource.newInstance(9999, 3));
+ increaseTokens.add(containerToken);
+
+ // add change request for container-7, the resource not match because
+ // container not existed
+ cId = createContainerId(7);
+ identifier = DUMMY_RM_IDENTIFIER;
+ containerToken =
+ createContainerToken(cId, identifier, context.getNodeId(), user,
+ context.getContainerTokenSecretManager(),
+ Resource.newInstance(9999, 3));
+ increaseTokens.add(containerToken);
+
+ ChangeContainersResourceRequest changeRequest = ChangeContainersResourceRequest
+ .newInstance(increaseTokens, null);
+ ChangeContainersResourceResponse changeResponse = containerManager
+ .changeContainersResource(changeRequest);
+
+ // check response
+ Assert.assertEquals(2, changeResponse.getFailedChangedContainers().size());
+ Assert.assertEquals(0, changeResponse.getFailedChangedContainers().get(0).getId());
+ Assert.assertEquals(7, changeResponse.getFailedChangedContainers().get(1).getId());
+ }
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
@@ -807,4 +872,19 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenIdentifier);
return containerToken;
}
+
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user,
+ NMContainerTokenSecretManager containerTokenSecretManager, Resource resource)
+ throws IOException {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
+ System.currentTimeMillis() + 100000L, 123, rmIdentifier);
+ Token containerToken =
+ BuilderUtils
+ .newContainerToken(nodeId, containerTokenSecretManager
+ .retrievePassword(containerTokenIdentifier),
+ containerTokenIdentifier);
+ return containerToken;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
new file mode 100644
index 0000000..b560f40
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+
+public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
+ @Override
+ public long getVirtualMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public long getPhysicalMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public long getAvailableVirtualMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public long getAvailablePhysicalMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public int getNumProcessors() {
+ return 0;
+ }
+
+ @Override
+ public long getCpuFrequency() {
+ return 0;
+ }
+
+ @Override
+ public long getCumulativeCpuTime() {
+ return 0;
+ }
+
+ @Override
+ public float getCpuUsage() {
+ return 0;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java
new file mode 100644
index 0000000..ace74a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+
+public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree {
+ long pmem = 0;
+ long vmem = 0;
+
+ public MockResourceCalculatorProcessTree(String root) {
+ super(root);
+ }
+
+ @Override
+ public void updateProcessTree() {
+ }
+
+ @Override
+ public String getProcessTreeDump() {
+ return "";
+ }
+
+ @Override
+ public long getCumulativeVmem(int olderThanAge) {
+ return olderThanAge > 0 ? 0 : vmem;
+ }
+
+ @Override
+ public long getCumulativeRssmem(int olderThanAge) {
+ return olderThanAge > 0 ? 0 : pmem;
+ }
+
+ @Override
+ public long getCumulativeCpuTime() {
+ return 0;
+ }
+
+ @Override
+ public boolean checkPidPgrpidForMatch() {
+ return true;
+ }
+
+ public void setPmem(long pmem) {
+ this.pmem = pmem;
+ }
+
+ public void setVmem(long vmem) {
+ this.vmem = vmem;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
new file mode 100644
index 0000000..8f5beeb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestContainersMonitorResourceChange {
+ ContainersMonitorImpl cm;
+ MockExecutor executor;
+ Configuration conf;
+ AsyncDispatcher dispatcher;
+ MockContainerEventHandler containerEventHandler;
+
+ static class MockExecutor extends ContainerExecutor {
+ ConcurrentMap containerIdToPid =
+ new ConcurrentHashMap();
+
+ @Override
+ public void init() throws IOException {
+ }
+
+ @Override
+ public void startLocalizer(Path nmPrivateContainerTokens,
+ InetSocketAddress nmAddr, String user, String appId, String locId,
+ List localDirs, List logDirs) throws IOException,
+ InterruptedException {
+ }
+
+ @Override
+ public int launchContainer(Container container,
+ Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
+ String user, String appId, Path containerWorkDir,
+ List localDirs, List logDirs) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean signalContainer(String user, String pid, Signal signal)
+ throws IOException {
+ return true;
+ }
+
+ @Override
+ public void deleteAsUser(String user, Path subDir, Path... basedirs)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public String getProcessId(ContainerId containerID) {
+ return String.valueOf(containerID.getId());
+ }
+
+ public void setContainerPid(ContainerId containerId, String pid) {
+ containerIdToPid.put(containerId, pid);
+ }
+ }
+
+ static class MockContainerEventHandler implements
+ EventHandler {
+ Set killedContainer = new HashSet();
+
+ @Override
+ public void handle(ContainerEvent event) {
+ if (event.getType() == ContainerEventType.KILL_CONTAINER) {
+ synchronized (killedContainer) {
+ killedContainer.add(event.getContainerID());
+ }
+ }
+ }
+
+ public boolean isContainerKilled(ContainerId containerId) {
+ synchronized (killedContainer) {
+ return killedContainer.contains(containerId);
+ }
+ }
+ }
+
+ @Before
+ public void setup() {
+ executor = new MockExecutor();
+ dispatcher = new AsyncDispatcher();
+ conf = new Configuration();
+ conf.set(
+ YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+ MockResourceCalculatorPlugin.class.getCanonicalName());
+ conf.set(
+ YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+ MockResourceCalculatorProcessTree.class.getCanonicalName());
+ conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L);
+ dispatcher.init(conf);
+ dispatcher.start();
+ containerEventHandler = new MockContainerEventHandler();
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+ cm = new ContainersMonitorImpl(executor, dispatcher, null);
+ cm.init(conf);
+ cm.start();
+ }
+
+ @After
+ public void finalize() {
+ try {
+ cm.stop();
+ } catch (Exception e) {
+ // do nothing
+ }
+ try {
+ dispatcher.stop();
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+
+ private ContainerId getContainerId(int id) {
+ return ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(123456L, 1), 1), id);
+ }
+
+ private ProcessTreeInfo getProcessTreeInfo(ContainerId id) {
+ return cm.trackingContainers.get(id);
+ }
+
+ @Test
+ public void testResourceChange() throws Exception {
+ // create a container-1 with
+ cm.handle(new ContainerStartMonitoringEvent(
+ getContainerId(1), 2100L, 1000L));
+
+ // check if this container is tracked and it's value
+ Thread.sleep(200);
+ Assert.assertNotNull(getProcessTreeInfo(getContainerId(1)));
+ Assert.assertEquals(1000L, getProcessTreeInfo(getContainerId(1))
+ .getPmemLimit());
+ Assert.assertEquals(2100L, getProcessTreeInfo(getContainerId(1))
+ .getVmemLimit());
+
+ // increase size of pmem usage, it will be killed
+ MockResourceCalculatorProcessTree mockTree =
+ (MockResourceCalculatorProcessTree) getProcessTreeInfo(
+ getContainerId(1)).getProcessTree();
+ mockTree.setPmem(2500L);
+
+ // check if this container killed
+ Thread.sleep(200);
+ Assert.assertTrue(containerEventHandler
+ .isContainerKilled(getContainerId(1)));
+
+ // create a container-2 with
+ cm.handle(new ContainerStartMonitoringEvent(
+ getContainerId(2), 2100L, 1000L));
+
+ // check if this container is tracked and it's value
+ Thread.sleep(200);
+ Assert.assertNotNull(getProcessTreeInfo(getContainerId(2)));
+ Assert.assertEquals(1000L, getProcessTreeInfo(getContainerId(2))
+ .getPmemLimit());
+ Assert.assertEquals(2100L, getProcessTreeInfo(getContainerId(2))
+ .getVmemLimit());
+
+ // trigger a change resource event, check limit after changed
+ cm.handle(new ContainerChangeMonitoringEvent(getContainerId(2), 4200L,
+ 2000L));
+ Thread.sleep(200);
+ Assert.assertNotNull(getProcessTreeInfo(getContainerId(2)));
+ Assert.assertEquals(2000L, getProcessTreeInfo(getContainerId(2))
+ .getPmemLimit());
+ Assert.assertEquals(4200L, getProcessTreeInfo(getContainerId(2))
+ .getVmemLimit());
+
+ // increase size of pmem usage, it should NOT be killed
+ mockTree =
+ (MockResourceCalculatorProcessTree) getProcessTreeInfo(
+ getContainerId(2)).getProcessTree();
+ mockTree.setPmem(2500L);
+
+ // check if this container killed
+ Thread.sleep(200);
+ Assert.assertFalse(containerEventHandler
+ .isContainerKilled(getContainerId(2)));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
index c9e57a6..88bcf95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
@@ -31,6 +31,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -308,4 +310,11 @@ synchronized public GetContainerStatusesResponse getContainerStatuses(
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
}
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
index a9f1c1a..6f9a80f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
@@ -39,6 +39,8 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -129,6 +131,13 @@ public Credentials getContainerCredentials() throws IOException {
credentials.readTokenStorageStream(buf);
return credentials;
}
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 64e5cc9..90fe1bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -29,6 +29,8 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ChangeContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -118,6 +120,13 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws YarnException {
return null;
}
+
+ @Override
+ public ChangeContainersResourceResponse changeContainersResource(
+ ChangeContainersResourceRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
}
@Test