check, int checkEveryMillis,
} while (true);
}
+ /**
+ *
+ * An abstract callback class which implements the CallbackHandler interface
+ * and defines additional methods.
+ *
+ */
+ public abstract static class AbstractCallbackHandler
+ implements CallbackHandler {
+ /**
+ * Called when the ResourceManager responds to a heartbeat with containers
+ * whose resource allocation has been decreased.
+ */
+ public abstract void onContainersResourceDecreased(
+ List containers);
+
+ /**
+ * Called when the ResourceManager responds to a heartbeat with containers
+ * whose resource allocation has been increased.
+ */
+ public abstract void onContainersResourceIncreased(
+ List containers);
+ }
+
public interface CallbackHandler {
-
+
/**
* Called when the ResourceManager responds to a heartbeat with completed
* containers. If the response contains both completed containers and
* allocated containers, this will be called before containersAllocated.
*/
- public void onContainersCompleted(List statuses);
-
+ void onContainersCompleted(List statuses);
+
/**
* Called when the ResourceManager responds to a heartbeat with allocated
* containers. If the response containers both completed containers and
* allocated containers, this will be called after containersCompleted.
*/
- public void onContainersAllocated(List containers);
-
+ void onContainersAllocated(List containers);
+
/**
* Called when the ResourceManager wants the ApplicationMaster to shutdown
* for being out of sync etc. The ApplicationMaster should not unregister
* with the RM unless the ApplicationMaster wants to be the last attempt.
*/
- public void onShutdownRequest();
-
+ void onShutdownRequest();
+
/**
* Called when nodes tracked by the ResourceManager have changed in health,
* availability etc.
*/
- public void onNodesUpdated(List updatedNodes);
-
- public float getProgress();
-
+ void onNodesUpdated(List updatedNodes);
+
+ float getProgress();
+
/**
* Called when error comes from RM communications as well as from errors in
* the callback itself from the app. Calling
@@ -302,6 +347,6 @@ public void waitFor(Supplier check, int checkEveryMillis,
*
* @param e
*/
- public void onError(Throwable e);
+ void onError(Throwable e);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index addc3b6..948953f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -177,6 +177,18 @@ public void removeContainerRequest(T req) {
client.removeContainerRequest(req);
}
+ @Override
+ public void requestContainerResourceIncrease(
+ ContainerId containerId, Resource capability) {
+ client.requestContainerResourceIncrease(containerId, capability);
+ }
+
+ @Override
+ public void requestContainerResourceDecrease(
+ ContainerId containerId, Resource capability) {
+ client.requestContainerResourceDecrease(containerId, capability);
+ }
+
/**
* Release containers assigned by the Resource Manager. If the app cannot use
* the container or wants to give up the container then it can release them.
@@ -300,6 +312,19 @@ public void run() {
handler.onContainersCompleted(completed);
}
+ if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
+ List decreased = response.getDecreasedContainers();
+ if (!decreased.isEmpty()) {
+ ((AMRMClientAsync.AbstractCallbackHandler) handler)
+ .onContainersResourceDecreased(decreased);
+ }
+ List increased = response.getIncreasedContainers();
+ if (!increased.isEmpty()) {
+ ((AMRMClientAsync.AbstractCallbackHandler) handler)
+ .onContainersResourceIncreased(increased);
+ }
+ }
+
List allocated = response.getAllocatedContainers();
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 4cf9aa0..acfe3fa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -49,7 +49,9 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -72,6 +74,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
@@ -110,8 +113,8 @@
containerRequests = new LinkedHashSet();
}
}
-
-
+
+
/**
* Class compares Resource by memory then cpu in reverse order
*/
@@ -144,10 +147,7 @@ static boolean canFit(Resource arg0, Resource arg1) {
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
- if(mem0 <= mem1 && cpu0 <= cpu1) {
- return true;
- }
- return false;
+ return (mem0 <= mem1 && cpu0 <= cpu1);
}
//Key -> Priority
@@ -164,11 +164,22 @@ static boolean canFit(Resource arg0, Resource arg1) {
protected final Set ask = new TreeSet(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
protected final Set release = new TreeSet();
- // pendingRelease holds history or release requests.request is removed only if
- // RM sends completedContainer.
+ // pendingRelease holds history of release requests.
+ // request is removed only if RM sends completedContainer.
// How it different from release? --> release is for per allocate() request.
protected Set pendingRelease = new TreeSet();
-
+ // increase and decrease maps hold container increase and decrease requests
+ // between two allocate() calls, and are cleared after each successful
+ // allocate() call.
+ protected final Map increase = new HashMap<>();
+ protected final Map decrease = new HashMap<>();
+ // pendingIncrease and pendingDecrease maps hold history of increase and
+ // decrease requests in case AM needs to reregister with the ResourceManager.
+ // Requests are removed from these maps only if RM confirms the increase or
+ // decrease through allocate response.
+ protected final Map pendingIncrease = new HashMap<>();
+ protected final Map pendingDecrease = new HashMap<>();
+
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
}
@@ -241,7 +252,9 @@ public AllocateResponse allocate(float progressIndicator)
AllocateRequest allocateRequest = null;
List blacklistToAdd = new ArrayList();
List blacklistToRemove = new ArrayList();
-
+ List increaseList = new ArrayList<>();
+ List decreaseList = new ArrayList<>();
+
try {
synchronized (this) {
askList = new ArrayList(ask.size());
@@ -252,10 +265,22 @@ public AllocateResponse allocate(float progressIndicator)
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality(), r.getNodeLabelExpression()));
}
+ for (Map.Entry entry : decrease.entrySet()) {
+ decreaseList.add(
+ ContainerResourceChangeRequest.newInstance(
+ entry.getKey(), entry.getValue()));
+ }
+ for(Map.Entry entry : increase.entrySet()) {
+ increaseList.add(
+ ContainerResourceChangeRequest.newInstance(
+ entry.getKey(), entry.getValue()));
+ }
releaseList = new ArrayList(release);
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
+ increase.clear();
+ decrease.clear();
blacklistToAdd.addAll(blacklistAdditions);
blacklistToRemove.addAll(blacklistRemovals);
@@ -266,8 +291,9 @@ public AllocateResponse allocate(float progressIndicator)
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
- askList, releaseList, blacklistRequest);
- // clear blacklistAdditions and blacklistRemovals before
+ askList, releaseList, blacklistRequest,
+ increaseList, decreaseList);
+ // clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
blacklistRemovals.clear();
@@ -289,6 +315,8 @@ public AllocateResponse allocate(float progressIndicator)
}
}
}
+ increase.putAll(this.pendingIncrease);
+ decrease.putAll(this.pendingDecrease);
}
// re register with RM
registerApplicationMaster();
@@ -312,6 +340,16 @@ public AllocateResponse allocate(float progressIndicator)
removePendingReleaseRequests(allocateResponse
.getCompletedContainersStatuses());
}
+ if (!pendingIncrease.isEmpty()
+ && !allocateResponse.getIncreasedContainers().isEmpty()) {
+ removePendingChangeRequests(
+ allocateResponse.getDecreasedContainers(), true);
+ }
+ if (!pendingDecrease.isEmpty()
+ && !allocateResponse.getDecreasedContainers().isEmpty()) {
+ removePendingChangeRequests(
+ allocateResponse.getDecreasedContainers(), false);
+ }
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
@@ -333,7 +371,34 @@ public AllocateResponse allocate(float progressIndicator)
ask.add(oldAsk);
}
}
-
+ // increase/decrease requests could have been added during the
+ // allocate call. Those are the newest requests which take precedence
+ // over requests cached in the increaseList and decreaseList.
+ //
+ // Only insert entries from the cached increaseList and decreaseList
+ // that do not exist in either of current decrease and increase maps:
+ // 1. If the cached increaseList contains the same container as that
+ // in the new increase map, then there is nothing to do as the
+ // the request in the new increase map has the latest value.
+ // 2. If the cached increaseList contains the same container as that
+ // in the new decrease map, then there is nothing to do either as
+ // the request in the new decrease map is newer and should cancel
+ // the old increase request.
+ // 3. The above also apply to the decreaseList.
+ for (ContainerResourceChangeRequest oldIncrease : increaseList) {
+ ContainerId oldContainerId = oldIncrease.getContainerId();
+ if (increase.get(oldContainerId) == null
+ && decrease.get(oldContainerId) == null) {
+ increase.put(oldContainerId, oldIncrease.getCapability());
+ }
+ }
+ for (ContainerResourceChangeRequest oldDecrease : decreaseList) {
+ ContainerId oldContainerId = oldDecrease.getContainerId();
+ if (decrease.get(oldContainerId) == null
+ && increase.get(oldContainerId) == null) {
+ decrease.put(oldContainerId, oldDecrease.getCapability());
+ }
+ }
blacklistAdditions.addAll(blacklistToAdd);
blacklistRemovals.addAll(blacklistToRemove);
}
@@ -349,6 +414,44 @@ protected void removePendingReleaseRequests(
}
}
+ protected void removePendingChangeRequests(
+ List changedContainers, boolean isIncrease) {
+ for (Container changedContainer : changedContainers) {
+ ContainerId containerId = changedContainer.getId();
+ Resource targetCapability;
+ if (isIncrease) {
+ targetCapability = pendingIncrease.get(containerId);
+ } else {
+ targetCapability = pendingDecrease.get(containerId);
+ }
+ if (targetCapability == null) {
+ continue;
+ }
+ Resource changedCapability = changedContainer.getResource();
+ if (isIncrease) {
+ if (Resources.fitsIn(targetCapability, changedCapability)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RM has confirmed increased resource allocation for "
+ + "container " + containerId + ". Current resource allocation:"
+ + changedCapability + ". Remove pending increase request:"
+ + targetCapability);
+ }
+ pendingIncrease.remove(containerId);
+ }
+ } else {
+ if (Resources.fitsIn(changedCapability, targetCapability)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RM has confirmed decreased resource allocation for "
+ + "container " + containerId + ". Current resource allocation:"
+ + changedCapability + ". Remove pending decrease request:"
+ + targetCapability);
+ }
+ pendingDecrease.remove(containerId);
+ }
+ }
+ }
+ }
+
@Private
@VisibleForTesting
protected void populateNMTokens(List nmTokens) {
@@ -480,6 +583,26 @@ public synchronized void removeContainerRequest(T req) {
}
@Override
+ public synchronized void requestContainerResourceIncrease(
+ ContainerId containerId, Resource capability) {
+ validateContainerResourceChangeRequest(containerId, capability);
+ pendingDecrease.remove(containerId);
+ decrease.remove(containerId);
+ pendingIncrease.put(containerId, capability);
+ increase.put(containerId, capability);
+ }
+
+ @Override
+ public synchronized void requestContainerResourceDecrease(
+ ContainerId containerId, Resource capability) {
+ validateContainerResourceChangeRequest(containerId, capability);
+ pendingIncrease.remove(containerId);
+ increase.remove(containerId);
+ pendingDecrease.put(containerId, capability);
+ decrease.put(containerId, capability);
+ }
+
+ @Override
public synchronized void releaseAssignedContainer(ContainerId containerId) {
Preconditions.checkArgument(containerId != null,
"ContainerId can not be null.");
@@ -618,7 +741,18 @@ private void checkNodeLabelExpression(T containerRequest) {
"Cannot specify node label with rack and node");
}
}
-
+
+ private void validateContainerResourceChangeRequest(
+ ContainerId containerId, Resource capability) {
+ Preconditions.checkArgument(containerId != null,
+ "ContainerId cannot be null");
+ Preconditions.checkArgument(capability != null,
+ "Target resource capability cannot be null");
+ Preconditions.checkArgument(!Resources.equals(Resources.none(), capability)
+ && Resources.fitsIn(Resources.none(), capability),
+ "Target resource capability must be greater than 0");
+ }
+
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index 74d4aa4..54aa451 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -74,12 +74,18 @@ public void testAMRMClientAsync() throws Exception {
List completed1 = Arrays.asList(
ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
ContainerState.COMPLETE, "", 0));
- List allocated1 = Arrays.asList(
+ List containers = Arrays.asList(
Container.newInstance(null, null, null, null, null, null));
final AllocateResponse response1 = createAllocateResponse(
- new ArrayList(), allocated1, null);
+ new ArrayList(), containers, null);
final AllocateResponse response2 = createAllocateResponse(completed1,
new ArrayList(), null);
+ final AllocateResponse response3 = createAllocateResponse(
+ new ArrayList(), new ArrayList(),
+ containers, null, null);
+ final AllocateResponse response4 = createAllocateResponse(
+ new ArrayList(), new ArrayList(),
+ null, containers, null);
final AllocateResponse emptyResponse = createAllocateResponse(
new ArrayList(), new ArrayList(), null);
@@ -99,7 +105,7 @@ public AllocateResponse answer(InvocationOnMock invocation)
secondHeartbeatSync.incrementAndGet();
return response2;
}
- }).thenReturn(emptyResponse);
+ }).thenReturn(response3).thenReturn(response4).thenReturn(emptyResponse);
when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
.thenReturn(null);
when(client.getAvailableResources()).thenAnswer(new Answer() {
@@ -146,12 +152,20 @@ public Resource answer(InvocationOnMock invocation)
Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
Thread.sleep(10);
}
-
+
// wait for the completed containers from the second heartbeat's response
while (callbackHandler.takeCompletedContainers() == null) {
Thread.sleep(10);
}
-
+
+ while (callbackHandler.takeIncreasedContainers() == null) {
+ Thread.sleep(10);
+ }
+
+ while (callbackHandler.takeDecreasedContainers() == null) {
+ Thread.sleep(10);
+ }
+
asyncClient.stop();
Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
@@ -397,6 +411,17 @@ private AllocateResponse createAllocateResponse(
return response;
}
+ private AllocateResponse createAllocateResponse(
+ List completed, List allocated,
+ List increased, List decreased,
+ List nmTokens) {
+ AllocateResponse response =
+ AllocateResponse.newInstance(0, completed, allocated,
+ new ArrayList(), null, null, 1, null, nmTokens,
+ increased, decreased);
+ return response;
+ }
+
public static ContainerId newContainerId(int appId, int appAttemptId,
long timestamp, int containerId) {
ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId);
@@ -405,9 +430,12 @@ public static ContainerId newContainerId(int appId, int appAttemptId,
return ContainerId.newContainerId(applicationAttemptId, containerId);
}
- private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ private class TestCallbackHandler
+ extends AMRMClientAsync.AbstractCallbackHandler {
private volatile List completedContainers;
private volatile List allocatedContainers;
+ private volatile List increasedContainers;
+ private volatile List decreasedContainers;
Exception savedException = null;
volatile boolean reboot = false;
Object notifier = new Object();
@@ -425,7 +453,31 @@ public static ContainerId newContainerId(int appId, int appAttemptId,
}
return ret;
}
-
+
+ public List takeDecreasedContainers() {
+ List ret = decreasedContainers;
+ if (ret == null) {
+ return null;
+ }
+ decreasedContainers = null;
+ synchronized (ret) {
+ ret.notify();
+ }
+ return ret;
+ }
+
+ public List takeIncreasedContainers() {
+ List ret = increasedContainers;
+ if (ret == null) {
+ return null;
+ }
+ increasedContainers = null;
+ synchronized (ret) {
+ ret.notify();
+ }
+ return ret;
+ }
+
public List takeAllocatedContainers() {
List ret = allocatedContainers;
if (ret == null) {
@@ -454,6 +506,34 @@ public void onContainersCompleted(List statuses) {
}
@Override
+ public void onContainersResourceDecreased(List containers) {
+ decreasedContainers = containers;
+ synchronized (decreasedContainers) {
+ while (decreasedContainers != null) {
+ try {
+ decreasedContainers.wait();
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted during wait", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onContainersResourceIncreased(List containers) {
+ increasedContainers = containers;
+ synchronized (increasedContainers) {
+ while (increasedContainers != null) {
+ try {
+ increasedContainers.wait();
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted during wait", ex);
+ }
+ }
+ }
+ }
+
+ @Override
public void onContainersAllocated(List containers) {
allocatedContainers = containers;
// wait for containers to be taken before returning
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 7d29d05..6e00a1b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -39,6 +39,7 @@
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
@@ -73,6 +74,7 @@
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
+import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -126,6 +128,8 @@ public static void setup() throws Exception {
rolling_interval_sec);
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+ // set the minimum allocation so that resource decrease can go under 1024
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf);
@@ -730,8 +734,156 @@ public void testAskWithInvalidNodeLabels() {
new ContainerRequest(Resource.newInstance(1024, 1), null, null,
Priority.UNDEFINED, true, "x && y"));
}
-
- private void testAllocation(final AMRMClientImpl amClient)
+
+ @Test(timeout=60000)
+ public void testAMRMClientWithContainerResourceChange()
+ throws YarnException, IOException {
+ AMRMClient amClient = null;
+ try {
+ // start am rm client
+ amClient = AMRMClient.createAMRMClient();
+ Assert.assertNotNull(amClient);
+ // asserting we are using the singleton instance cache
+ Assert.assertSame(
+ NMTokenCache.getSingleton(), amClient.getNMTokenCache());
+ amClient.init(conf);
+ amClient.start();
+ assertEquals(STATE.STARTED, amClient.getServiceState());
+ // start am nm client
+ NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
+ Assert.assertNotNull(nmClient);
+ // asserting we are using the singleton instance cache
+ Assert.assertSame(
+ NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
+ nmClient.init(conf);
+ nmClient.start();
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
+ // am rm client register the application master with RM
+ amClient.registerApplicationMaster("Host", 10000, "");
+ // allocate two containers and make sure they are in RUNNING state
+ List containers =
+ allocateAndStartContainers(amClient, nmClient, 2);
+ // perform container resource increase and decrease tests
+ doContainerResourceChange(amClient, containers);
+ // unregister and finish up the test
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ private List allocateAndStartContainers(
+ final AMRMClient amClient, final NMClient nmClient,
+ int num) throws YarnException, IOException {
+ // set up allocation requests
+ for (int i = 0; i < num; ++i) {
+ amClient.addContainerRequest(
+ new ContainerRequest(capability, nodes, racks, priority));
+ }
+ // send allocation requests
+ amClient.allocate(0.1f);
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(150);
+ // get allocations
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ List containers = allocResponse.getAllocatedContainers();
+ Assert.assertEquals(num, containers.size());
+ // build container launch context
+ Credentials ts = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ // start a process long enough for increase/decrease action to take effect
+ ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext(
+ Collections.emptyMap(),
+ new HashMap(), Arrays.asList("sleep", "100"),
+ new HashMap(), securityTokens,
+ new HashMap());
+ // start the containers and make sure they are in RUNNING state
+ try {
+ for (int i = 0; i < num; i++) {
+ Container container = containers.get(i);
+ nmClient.startContainer(container, clc);
+ // NodeManager may still need some time to get the stable
+ // container status
+ while (true) {
+ ContainerStatus status = nmClient.getContainerStatus(
+ container.getId(), container.getNodeId());
+ if (status.getState() == ContainerState.RUNNING) {
+ break;
+ }
+ sleep(100);
+ }
+ }
+ } catch (YarnException e) {
+ throw new AssertionError("Exception is not expected: " + e);
+ }
+ // sleep to let NM's heartbeat to RM to confirm container launch
+ sleep(200);
+ return containers;
+ }
+
+
+ private void doContainerResourceChange(
+ final AMRMClient amClient, List containers)
+ throws YarnException, IOException {
+ Assert.assertEquals(2, containers.size());
+ // remember the container IDs
+ ContainerId container1 = containers.get(0).getId();
+ ContainerId container2 = containers.get(1).getId();
+ AMRMClientImpl amClientImpl = (AMRMClientImpl) amClient;
+ Assert.assertNotEquals(container2, container1);
+ Assert.assertEquals(0, amClientImpl.increase.size());
+ Assert.assertEquals(0, amClientImpl.decrease.size());
+ // verify newer request overwrites older request for the container1
+ amClientImpl.requestContainerResourceIncrease(
+ container1, Resource.newInstance(2048, 1));
+ amClientImpl.requestContainerResourceIncrease(
+ container1, Resource.newInstance(4096, 1));
+ Assert.assertEquals(
+ Resource.newInstance(4096, 1), amClientImpl.increase.get(container1));
+ // verify new decrease request cancels old increase request for container1
+ amClientImpl.requestContainerResourceDecrease(
+ container1, Resource.newInstance(512, 1));
+ Assert.assertTrue(amClientImpl.increase.get(container1) == null);
+ Assert.assertEquals(
+ Resource.newInstance(512, 1), amClientImpl.decrease.get(container1));
+ // request resource increase for container2
+ amClientImpl.requestContainerResourceIncrease(
+ container2, Resource.newInstance(2048, 1));
+ Assert.assertTrue(amClientImpl.decrease.get(container2) == null);
+ Assert.assertEquals(
+ Resource.newInstance(2048, 1), amClientImpl.increase.get(container2));
+ // as of now: container1 asks to decrease to (512, 1)
+ // container2 asks to increase to (2048, 1)
+ // send allocation requests
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ Assert.assertEquals(0, amClientImpl.increase.size());
+ Assert.assertEquals(0, amClientImpl.decrease.size());
+ // we should get decrease confirmation right away
+ List decreasedContainers =
+ allocResponse.getDecreasedContainers();
+ List increasedContainers =
+ allocResponse.getIncreasedContainers();
+ Assert.assertEquals(1, decreasedContainers.size());
+ Assert.assertEquals(0, increasedContainers.size());
+ // we should get increase allocation after the next NM's heartbeat to RM
+ sleep(150);
+ // get allocations
+ allocResponse = amClient.allocate(0.1f);
+ decreasedContainers =
+ allocResponse.getDecreasedContainers();
+ increasedContainers =
+ allocResponse.getIncreasedContainers();
+ Assert.assertEquals(1, increasedContainers.size());
+ Assert.assertEquals(0, decreasedContainers.size());
+ }
+
+ private void testAllocation(final AMRMClientImpl amClient)
throws YarnException, IOException {
// setup container request