diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
index e7458cf..4d184cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
@@ -59,6 +59,22 @@ public static UpdateContainerError newInstance(String reason,
public abstract void setReason(String reason);
/**
+ * Get current container version.
+ * @return Current container Version.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract int getCurrentContainerVersion();
+
+ /**
+ * Set current container version.
+ * @param currentVersion Current container version.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public abstract void setCurrentContainerVersion(int currentVersion);
+
+ /**
* Get the {@code UpdateContainerRequest} that was not satisfiable.
* @return UpdateContainerRequest
*/
@@ -89,6 +105,7 @@ public int hashCode() {
@Override
public String toString() {
return "UpdateContainerError{reason=" + getReason() + ", "
+ + "currentVersion=" + getCurrentContainerVersion() + ", "
+ "req=" + getUpdateContainerRequest() + "}";
}
@@ -120,6 +137,6 @@ public boolean equals(Object obj) {
} else if (!req.equals(other.getUpdateContainerRequest())) {
return false;
}
- return true;
+ return getCurrentContainerVersion() == other.getCurrentContainerVersion();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index df3c852..c6647c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -78,6 +78,7 @@ message UpdateContainerRequestProto {
message UpdateContainerErrorProto {
optional string reason = 1;
optional UpdateContainerRequestProto update_request = 2;
+ optional int32 current_container_version = 3;
}
message AllocateRequestProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 52155f5..15d0065 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -33,17 +33,20 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.yarn.util.resource.Resources;
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -518,12 +521,38 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu
* ResourceManager to change the existing resource allocation to the target
* resource allocation.
*
+ * @deprecated use
+ * {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+ *
* @param container The container returned from the last successful resource
* allocation or resource change
* @param capability The target resource capability of the container
*/
- public abstract void requestContainerResourceChange(
- Container container, Resource capability);
+ @Deprecated
+ public void requestContainerResourceChange(
+ Container container, Resource capability) {
+ Preconditions.checkNotNull(container, "Container cannot be null!!");
+ Preconditions.checkNotNull(capability,
+ "UpdateContainerRequest cannot be null!!");
+ requestContainerUpdate(container, UpdateContainerRequest.newInstance(
+ container.getVersion(), container.getId(),
+ Resources.fitsIn(capability, container.getResource()) ?
+ ContainerUpdateType.DECREASE_RESOURCE :
+ ContainerUpdateType.INCREASE_RESOURCE,
+ capability, null));
+ }
+
+ /**
+ * Request a container update before calling allocate.
+ * Any previous pending update request of the same container will be
+ * removed.
+ *
+ * @param container The container returned from the last successful resource
+ * allocation or update
+ * @param updateContainerRequest The UpdateContainerRequest.
+ */
+ public abstract void requestContainerUpdate(
+ Container container, UpdateContainerRequest updateContainerRequest);
/**
* Release containers assigned by the Resource Manager. If the app cannot use
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index d2195a6..4cb27cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -36,11 +36,13 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -50,6 +52,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.resource.Resources;
/**
* AMRMClientAsync handles communication with the ResourceManager
@@ -284,12 +287,38 @@ public abstract void unregisterApplicationMaster(
* ResourceManager to change the existing resource allocation to the target
* resource allocation.
*
+ * @deprecated use
+ * {@link #requestContainerUpdate(Container, UpdateContainerRequest)}
+ *
* @param container The container returned from the last successful resource
* allocation or resource change
* @param capability The target resource capability of the container
*/
- public abstract void requestContainerResourceChange(
- Container container, Resource capability);
+ @Deprecated
+ public void requestContainerResourceChange(
+ Container container, Resource capability) {
+ Preconditions.checkNotNull(container, "Container cannot be null!!");
+ Preconditions.checkNotNull(capability,
+ "UpdateContainerRequest cannot be null!!");
+ requestContainerUpdate(container, UpdateContainerRequest.newInstance(
+ container.getVersion(), container.getId(),
+ Resources.fitsIn(capability, container.getResource()) ?
+ ContainerUpdateType.DECREASE_RESOURCE :
+ ContainerUpdateType.INCREASE_RESOURCE,
+ capability, null));
+ }
+
+ /**
+ * Request a container update before calling allocate.
+ * Any previous pending update request of the same container will be
+ * removed.
+ *
+ * @param container The container returned from the last successful resource
+ * allocation or update
+ * @param updateContainerRequest The UpdateContainerRequest.
+ */
+ public abstract void requestContainerUpdate(
+ Container container, UpdateContainerRequest updateContainerRequest);
/**
* Release containers assigned by the Resource Manager. If the app cannot use
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 3dd53d3..5274fd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -25,6 +25,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,10 +36,12 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -50,6 +53,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
@@ -207,9 +211,9 @@ public void removeContainerRequest(T req) {
}
@Override
- public void requestContainerResourceChange(
- Container container, Resource capability) {
- client.requestContainerResourceChange(container, capability);
+ public void requestContainerUpdate(Container container,
+ UpdateContainerRequest updateContainerRequest) {
+ client.requestContainerUpdate(container, updateContainerRequest);
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 44fc1e0..7da91de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -169,15 +169,16 @@ static boolean canFit(Resource arg0, Resource arg1) {
protected Set pendingRelease = new TreeSet();
// change map holds container resource change requests between two allocate()
// calls, and are cleared after each successful allocate() call.
- protected final Map> change =
- new HashMap<>();
+ protected final Map> change = new HashMap<>();
// pendingChange map holds history of container resource change requests in
// case AM needs to reregister with the ResourceManager.
// Change requests are removed from this map if RM confirms the change
// through allocate response, or if RM confirms that the container has been
// completed.
- protected final Map>
- pendingChange = new HashMap<>();
+ protected final Map> pendingChange =
+ new HashMap<>();
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
@@ -259,7 +260,7 @@ public AllocateResponse allocate(float progressIndicator)
AllocateRequest allocateRequest = null;
List blacklistToAdd = new ArrayList();
List blacklistToRemove = new ArrayList();
- Map> oldChange =
+ Map> oldChange =
new HashMap<>();
try {
synchronized (this) {
@@ -374,14 +375,15 @@ public AllocateResponse allocate(float progressIndicator)
//
// Only insert entries from the cached oldChange map
// that do not exist in the current change map:
- for (Map.Entry> entry :
+ for (Map.Entry> entry :
oldChange.entrySet()) {
ContainerId oldContainerId = entry.getKey();
Container oldContainer = entry.getValue().getKey();
- Resource oldResource = entry.getValue().getValue();
+ UpdateContainerRequest oldupdate = entry.getValue().getValue();
if (change.get(oldContainerId) == null) {
change.put(
- oldContainerId, new SimpleEntry<>(oldContainer, oldResource));
+ oldContainerId, new SimpleEntry<>(oldContainer, oldupdate));
}
}
blacklistAdditions.addAll(blacklistToAdd);
@@ -394,19 +396,17 @@ public AllocateResponse allocate(float progressIndicator)
private List createUpdateList() {
List updateList = new ArrayList<>();
- for (Map.Entry> entry :
- change.entrySet()) {
- Resource targetCapability = entry.getValue().getValue();
- Resource currCapability = entry.getValue().getKey().getResource();
- int version = entry.getValue().getKey().getVersion();
+ for (Map.Entry> entry : change.entrySet()) {
+ Resource targetCapability = entry.getValue().getValue().getCapability();
+ ExecutionType targetExecType =
+ entry.getValue().getValue().getExecutionType();
ContainerUpdateType updateType =
- ContainerUpdateType.INCREASE_RESOURCE;
- if (Resources.fitsIn(targetCapability, currCapability)) {
- updateType = ContainerUpdateType.DECREASE_RESOURCE;
- }
+ entry.getValue().getValue().getContainerUpdateType();
+ int version = entry.getValue().getKey().getVersion();
updateList.add(
UpdateContainerRequest.newInstance(version, entry.getKey(),
- updateType, targetCapability, null));
+ updateType, targetCapability, targetExecType));
}
return updateList;
}
@@ -591,21 +591,47 @@ public synchronized void removeContainerRequest(T req) {
}
@Override
- public synchronized void requestContainerResourceChange(
- Container container, Resource capability) {
- validateContainerResourceChangeRequest(
- container.getId(), container.getResource(), capability);
+ public synchronized void requestContainerUpdate(
+ Container container, UpdateContainerRequest updateContainerRequest) {
+ Preconditions.checkNotNull(container, "Container cannot be null!!");
+ Preconditions.checkNotNull(updateContainerRequest,
+ "UpdateContainerRequest cannot be null!!");
+ LOG.info("Requesting Container update : " +
+ "container=" + container + ", " +
+ "updateType=" + updateContainerRequest.getContainerUpdateType() + ", " +
+ "targetCapability=" + updateContainerRequest.getCapability() + ", " +
+ "targetExecType=" + updateContainerRequest.getExecutionType());
+ if (updateContainerRequest.getCapability() != null &&
+ updateContainerRequest.getExecutionType() == null) {
+ validateContainerResourceChangeRequest(
+ updateContainerRequest.getContainerUpdateType(),
+ container.getId(), container.getResource(),
+ updateContainerRequest.getCapability());
+ } else if (updateContainerRequest.getExecutionType() != null &&
+ updateContainerRequest.getCapability() == null) {
+ validateContainerExecTypeChangeRequest(
+ updateContainerRequest.getContainerUpdateType(),
+ container.getId(), container.getExecutionType(),
+ updateContainerRequest.getExecutionType());
+ } else if (updateContainerRequest.getExecutionType() == null &&
+ updateContainerRequest.getCapability() == null) {
+ throw new IllegalArgumentException("Both target Capability and" +
+ "target Execution Type are null");
+ } else {
+ throw new IllegalArgumentException("Support currently exists only for" +
+ " EITHER update of Capability OR update of Execution Type NOT both");
+ }
if (change.get(container.getId()) == null) {
change.put(container.getId(),
- new SimpleEntry<>(container, capability));
+ new SimpleEntry<>(container, updateContainerRequest));
} else {
- change.get(container.getId()).setValue(capability);
+ change.get(container.getId()).setValue(updateContainerRequest);
}
if (pendingChange.get(container.getId()) == null) {
pendingChange.put(container.getId(),
- new SimpleEntry<>(container, capability));
+ new SimpleEntry<>(container, updateContainerRequest));
} else {
- pendingChange.get(container.getId()).setValue(capability);
+ pendingChange.get(container.getId()).setValue(updateContainerRequest);
}
}
@@ -755,7 +781,8 @@ private void checkNodeLabelExpression(T containerRequest) {
}
private void validateContainerResourceChangeRequest(
- ContainerId containerId, Resource original, Resource target) {
+ ContainerUpdateType updateType, ContainerId containerId,
+ Resource original, Resource target) {
Preconditions.checkArgument(containerId != null,
"ContainerId cannot be null");
Preconditions.checkArgument(original != null,
@@ -768,6 +795,36 @@ private void validateContainerResourceChangeRequest(
Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
&& Resources.fitsIn(Resources.none(), target),
"Target resource capability must be greater than 0");
+ if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+ Preconditions.checkArgument(Resources.fitsIn(target, original),
+ "Target resource capability must fit in Original capability");
+ } else {
+ Preconditions.checkArgument(Resources.fitsIn(original, target),
+ "Target resource capability must be more than Original capability");
+
+ }
+ }
+
+ private void validateContainerExecTypeChangeRequest(
+ ContainerUpdateType updateType, ContainerId containerId,
+ ExecutionType original, ExecutionType target) {
+ Preconditions.checkArgument(containerId != null,
+ "ContainerId cannot be null");
+ Preconditions.checkArgument(original != null,
+ "Original Execution Type cannot be null");
+ Preconditions.checkArgument(target != null,
+ "Target Execution Type cannot be null");
+ if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
+ Preconditions.checkArgument(target == ExecutionType.OPPORTUNISTIC
+ && original == ExecutionType.GUARANTEED,
+ "Incorrect Container update request, target should be" +
+ " OPPORTUNISTIC and original should be GUARANTEED");
+ } else {
+ Preconditions.checkArgument(target == ExecutionType.GUARANTEED
+ && original == ExecutionType.OPPORTUNISTIC,
+ "Incorrect Container update request, target should be" +
+ " GUARANTEED and original should be OPPORTUNISTIC");
+ }
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 8b1bbc7..d1af869 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -1063,17 +1063,17 @@ private void doContainerResourceChange(
amClientImpl.requestContainerResourceChange(
container1, Resource.newInstance(4096, 1));
Assert.assertEquals(Resource.newInstance(4096, 1),
- amClientImpl.change.get(container1.getId()).getValue());
+ amClientImpl.change.get(container1.getId()).getValue().getCapability());
// verify new decrease request cancels old increase request for container1
amClientImpl.requestContainerResourceChange(
container1, Resource.newInstance(512, 1));
Assert.assertEquals(Resource.newInstance(512, 1),
- amClientImpl.change.get(container1.getId()).getValue());
+ amClientImpl.change.get(container1.getId()).getValue().getCapability());
// request resource increase for container2
amClientImpl.requestContainerResourceChange(
container2, Resource.newInstance(2048, 1));
Assert.assertEquals(Resource.newInstance(2048, 1),
- amClientImpl.change.get(container2.getId()).getValue());
+ amClientImpl.change.get(container2.getId()).getValue().getCapability());
// verify release request will cancel pending change requests for the same
// container
amClientImpl.requestContainerResourceChange(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index ac77446..39a7633 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -233,8 +234,11 @@ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
containerId.getContainerId(), ContainerState.RUNNING);
dispatcher.await();
- amClient.requestContainerResourceChange(
- container, Resource.newInstance(2048, 1));
+ amClient.requestContainerUpdate(
+ container, UpdateContainerRequest.newInstance(
+ container.getVersion(), container.getId(),
+ ContainerUpdateType.INCREASE_RESOURCE,
+ Resource.newInstance(2048, 1), null));
it.remove();
allocateResponse = amClient.allocate(0.3f);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
index 802c207..305d18b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -44,6 +45,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -54,6 +57,9 @@
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
@@ -66,13 +72,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/**
* Class that tests the allocation of OPPORTUNISTIC containers through the
@@ -83,7 +93,6 @@
private static MiniYARNCluster yarnCluster = null;
private static YarnClient yarnClient = null;
private static List nodeReports = null;
- private static ApplicationAttemptId attemptId = null;
private static int nodeCount = 3;
private static final int ROLLING_INTERVAL_SEC = 13;
@@ -92,12 +101,22 @@
private static Resource capability;
private static Priority priority;
private static Priority priority2;
+ private static Priority priority3;
+ private static Priority priority4;
private static String node;
private static String rack;
private static String[] nodes;
private static String[] racks;
private final static int DEFAULT_ITERATION = 3;
+ // Per test..
+ private ApplicationAttemptId attemptId = null;
+ private AMRMClientImpl amClient = null;
+ private long availMB;
+ private int availVCores;
+ private long allocMB;
+ private int allocVCores;
+
@BeforeClass
public static void setup() throws Exception {
// start minicluster
@@ -106,7 +125,7 @@ public static void setup() throws Exception {
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
ROLLING_INTERVAL_SEC);
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
- conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+ conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
// set the minimum allocation so that resource decrease can go under 1024
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setBoolean(
@@ -129,7 +148,9 @@ public static void setup() throws Exception {
priority = Priority.newInstance(1);
priority2 = Priority.newInstance(2);
- capability = Resource.newInstance(1024, 1);
+ priority3 = Priority.newInstance(3);
+ priority4 = Priority.newInstance(4);
+ capability = Resource.newInstance(512, 1);
node = nodeReports.get(0).getNodeId().getHost();
rack = nodeReports.get(0).getRackName();
@@ -193,10 +214,35 @@ public void startApp() throws Exception {
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
appAttempt.getAMRMToken()
.setService(ClientRMProxy.getAMRMTokenService(conf));
+
+ // start am rm client
+ amClient = (AMRMClientImpl)AMRMClient
+ .createAMRMClient();
+
+ //setting an instance NMTokenCache
+ amClient.setNMTokenCache(new NMTokenCache());
+ //asserting we are not using the singleton instance cache
+ Assert.assertNotSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
}
@After
public void cancelApp() throws YarnException, IOException {
+ try {
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
+ } finally {
+ if (amClient != null &&
+ amClient.getServiceState() == Service.STATE.STARTED) {
+ amClient.stop();
+ }
+ }
yarnClient.killApplication(attemptId.getApplicationId());
attemptId = null;
}
@@ -214,43 +260,254 @@ public static void tearDown() {
}
@Test(timeout = 60000)
- public void testAMRMClient() throws YarnException, IOException {
- AMRMClient amClient = null;
+ public void testPromotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map allocatedOpportContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Opp Allocation");
+
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+ updateMetrics("After Opp Allocation / Before Promotion");
+
try {
- // start am rm client
- amClient = AMRMClient.createAMRMClient();
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+ }
- //setting an instance NMTokenCache
- amClient.setNMTokenCache(new NMTokenCache());
- //asserting we are not using the singleton instance cache
- Assert.assertNotSame(NMTokenCache.getSingleton(),
- amClient.getNMTokenCache());
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ iterationsLeft = 120;
+ Map updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
- amClient.init(conf);
- amClient.start();
+ updateMetrics("After Promotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedOpportContainers.keySet()) {
+ Container orig = allocatedOpportContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.GUARANTEED,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
+ }
- amClient.registerApplicationMaster("Host", 10000, "");
+ @Test(timeout = 60000)
+ public void testDemotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
- testOpportunisticAllocation(
- (AMRMClientImpl) amClient);
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3));
- testAllocation((AMRMClientImpl)amClient);
+ int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
- amClient
- .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
- null);
+ assertEquals(1, guarContainersRequestedAny);
- } finally {
- if (amClient != null &&
- amClient.getServiceState() == Service.STATE.STARTED) {
- amClient.stop();
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map allocatedGuarContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Guar Allocation");
+
+ while (allocatedContainerCount < guarContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+ allocatedGuarContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < guarContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(guarContainersRequestedAny, allocatedContainerCount);
+ assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
+
+ updateMetrics("After Guar Allocation / Before Demotion");
+
+ try {
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+ }
+
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ iterationsLeft = 120;
+ Map updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
}
}
+
+ updateMetrics("After Demotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedGuarContainers.keySet()) {
+ Container orig = allocatedGuarContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.OPPORTUNISTIC,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
}
- private void testAllocation(
- final AMRMClientImpl amClient)
- throws YarnException, IOException {
+ @Test(timeout = 60000)
+ public void testMixedAllocationAndRelease() throws YarnException,
+ IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
@@ -274,6 +531,28 @@ private void testAllocation(
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
+ int containersRequestedNode = amClient.getTable(0).get(priority,
+ node, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = amClient.getTable(0).get(priority,
+ rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = amClient.getTable(0).get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(4, containersRequestedNode);
+ assertEquals(4, containersRequestedRack);
+ assertEquals(4, containersRequestedAny);
+ assertEquals(2, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
@@ -284,16 +563,16 @@ private void testAllocation(
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
- int containersRequestedNode = amClient.getTable(0).get(priority,
+ containersRequestedNode = amClient.getTable(0).get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
- int containersRequestedRack = amClient.getTable(0).get(priority,
+ containersRequestedRack = amClient.getTable(0).get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
- int containersRequestedAny = amClient.getTable(0).get(priority,
+ containersRequestedAny = amClient.getTable(0).get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
- int oppContainersRequestedAny =
+ oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
@@ -309,7 +588,7 @@ private void testAllocation(
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int allocatedOpportContainerCount = 0;
- int iterationsLeft = 10;
+ int iterationsLeft = 50;
Set releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
@@ -324,8 +603,8 @@ private void testAllocation(
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
- allocatedContainerCount += allocResponse.getAllocatedContainers()
- .size();
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedOpportContainerCount++;
@@ -345,9 +624,9 @@ private void testAllocation(
}
}
- assertEquals(allocatedContainerCount,
- containersRequestedAny + oppContainersRequestedAny);
- assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
+ assertEquals(containersRequestedAny + oppContainersRequestedAny,
+ allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
for (ContainerId rejectContainerId : releases) {
amClient.releaseAssignedContainer(rejectContainerId);
}
@@ -395,26 +674,25 @@ private void testAllocation(
/**
* Tests allocation with requests comprising only opportunistic containers.
*/
- private void testOpportunisticAllocation(
- final AMRMClientImpl amClient)
- throws YarnException, IOException {
+ @Test(timeout = 60000)
+ public void testOpportunisticAllocation() throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int oppContainersRequestedAny =
- amClient.getTable(0).get(priority, ResourceRequest.ANY,
+ amClient.getTable(0).get(priority3, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
@@ -456,9 +734,43 @@ private void testOpportunisticAllocation(
}
}
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
assertEquals(1, receivedNMTokens.values().size());
}
+ private void removeCR(Container container) {
+ List extends Collection>
+ matchingRequests = amClient.getMatchingRequests(container
+ .getPriority(),
+ ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+ container.getResource());
+ Set toRemove = new HashSet<>();
+ for (Collection rc : matchingRequests) {
+ for (AMRMClient.ContainerRequest cr : rc) {
+ toRemove.add(cr);
+ }
+ }
+ for (AMRMClient.ContainerRequest cr : toRemove) {
+ amClient.removeContainerRequest(cr);
+ }
+ }
+
+ private void updateMetrics(String msg) {
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler)yarnCluster.getResourceManager()
+ .getResourceScheduler();
+ availMB = scheduler.getRootQueueMetrics().getAvailableMB();
+ availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+ allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
+ allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+ System.out.println("## METRICS (" + msg + ")==>");
+ System.out.println(" : availMB=" + availMB + ", " +
+ "availVCores=" +availVCores + ", " +
+ "allocMB=" + allocMB + ", " +
+ "allocVCores=" + allocVCores + ", ");
+ System.out.println("<== ##");
+ }
+
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
index fb6c1a7..8ff9d9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateContainerErrorPBImpl.java
@@ -74,6 +74,22 @@ public void setReason(String reason) {
}
@Override
+ public int getCurrentContainerVersion() {
+ YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasCurrentContainerVersion()) {
+ return 0;
+ }
+ return p.getCurrentContainerVersion();
+ }
+
+ @Override
+ public void setCurrentContainerVersion(int containerVersion) {
+ maybeInitBuilder();
+ builder.setCurrentContainerVersion(containerVersion);
+ }
+
+ @Override
public UpdateContainerRequest getUpdateContainerRequest() {
YarnServiceProtos.UpdateContainerErrorProtoOrBuilder p = viaProto ? proto
: builder;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 94bfd58..224a1da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -188,19 +188,25 @@
}
}
}
- checkAndcreateUpdateError(updateErrors, updateReq, msg);
+ checkAndcreateUpdateError(updateErrors, updateReq, rmContainer, msg);
}
return updateRequests;
}
private static void checkAndcreateUpdateError(
List errors, UpdateContainerRequest updateReq,
- String msg) {
+ RMContainer rmContainer, String msg) {
if (msg != null) {
UpdateContainerError updateError = RECORD_FACTORY
.newRecordInstance(UpdateContainerError.class);
updateError.setReason(msg);
updateError.setUpdateContainerRequest(updateReq);
+ if (rmContainer != null) {
+ updateError.setCurrentContainerVersion(
+ rmContainer.getContainer().getVersion());
+ } else {
+ updateError.setCurrentContainerVersion(-1);
+ }
errors.add(updateError);
}
}
@@ -216,9 +222,7 @@ private static String validateContainerIdAndVersion(
// version
if (msg == null && updateReq.getContainerVersion() !=
rmContainer.getContainer().getVersion()) {
- msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
- + updateReq.getContainerVersion() + "|"
- + rmContainer.getContainer().getVersion();
+ msg = INCORRECT_CONTAINER_VERSION_ERROR;
}
// No more than 1 container update per request.
if (msg == null &&
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 641ef64..b083642 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -251,8 +251,11 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
- Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
+ Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
allocateResponse.getUpdateErrors().get(0).getReason());
+ Assert.assertEquals(0,
+ allocateResponse.getUpdateErrors().get(0)
+ .getCurrentContainerVersion());
Assert.assertEquals(container.getId(),
allocateResponse.getUpdateErrors().get(0)
.getUpdateContainerRequest().getContainerId());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index c5829cf..74cecf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -275,8 +275,10 @@ public void testConsecutiveContainerIncreaseAllocationExpiration()
Resources.createResource(5 * GB), null)));
List updateErrors = response.getUpdateErrors();
Assert.assertEquals(1, updateErrors.size());
- Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|0|1",
+ Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR",
updateErrors.get(0).getReason());
+ Assert.assertEquals(1,
+ updateErrors.get(0).getCurrentContainerVersion());
// am1 asks to change containerId2 from 3GB to 5GB
am1.sendContainerResizingRequest(Collections.singletonList(