diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index bfe10d6..f62b593 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -285,7 +285,7 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu * @param req Resource request */ public abstract void addContainerRequest(T req); - + /** * Remove previous container request. The previous container request may have * already been sent to the ResourceManager. So even after the remove request @@ -294,7 +294,29 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu * @param req Resource request */ public abstract void removeContainerRequest(T req); - + + /** + * Request container resource increase before calling allocate. + * Any pending resource decrease request of the same container that has not + * been sent to the ResourceManager will be cancelled. + * + * @param containerId The container ID + * @param capability The target resource capability of the container + */ + public abstract void requestContainerResourceIncrease( + ContainerId containerId, Resource capability); + + /** + * Request container resource decrease before calling allocate. + * Any pending resource increase request of the same container that has not + * been sent to the ResourceManager will be cancelled. + * + * @param containerId The container ID + * @param capability The target resource capability of the container + */ + public abstract void requestContainerResourceDecrease( + ContainerId containerId, Resource capability); + /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index f62e71b..e69e3a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -60,7 +60,7 @@ * public void onContainersAllocated(List containers) { * [run tasks on the containers] * } - * + * * public void onContainersCompleted(List statuses) { * [update progress, check whether app is done] * } @@ -172,6 +172,28 @@ public abstract void unregisterApplicationMaster( public abstract void removeContainerRequest(T req); /** + * Request container resource increase before calling allocate. + * Any pending resource decrease request of the same container that has not + * been sent to the ResourceManager will be cancelled. + * + * @param containerId The container ID + * @param capability The target resource capability of the container + */ + public abstract void requestContainerResourceIncrease( + ContainerId containerId, Resource capability); + + /** + * Request container resource decrease before calling allocate. + * Any pending resource increase request of the same container that has not + * been sent to the ResourceManager will be cancelled. + * + * @param containerId The container ID + * @param capability The target resource capability of the container + */ + public abstract void requestContainerResourceDecrease( + ContainerId containerId, Resource capability); + + /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. * The app needs to make new requests for the released resource capability if @@ -264,6 +286,29 @@ public void waitFor(Supplier check, int checkEveryMillis, } while (true); } + /** + *

+ * An abstract callback class which implements the CallbackHandler interface + * and defines additional methods. + *

+ */ + public abstract static class AbstractCallbackHandler + implements CallbackHandler { + /** + * Called when the ResourceManager responds to a heartbeat with containers + * whose resource allocation has been decreased. + */ + public abstract void onContainersResourceDecreased( + List containers); + + /** + * Called when the ResourceManager responds to a heartbeat with containers + * whose resource allocation has been increased. + */ + public abstract void onContainersResourceIncreased( + List containers); + } + public interface CallbackHandler { /** @@ -271,29 +316,29 @@ public void waitFor(Supplier check, int checkEveryMillis, * containers. If the response contains both completed containers and * allocated containers, this will be called before containersAllocated. */ - public void onContainersCompleted(List statuses); - + void onContainersCompleted(List statuses); + /** * Called when the ResourceManager responds to a heartbeat with allocated * containers. If the response containers both completed containers and * allocated containers, this will be called after containersCompleted. */ - public void onContainersAllocated(List containers); + void onContainersAllocated(List containers); /** * Called when the ResourceManager wants the ApplicationMaster to shutdown * for being out of sync etc. The ApplicationMaster should not unregister * with the RM unless the ApplicationMaster wants to be the last attempt. */ - public void onShutdownRequest(); + void onShutdownRequest(); /** * Called when nodes tracked by the ResourceManager have changed in health, * availability etc. */ - public void onNodesUpdated(List updatedNodes); + void onNodesUpdated(List updatedNodes); - public float getProgress(); + float getProgress(); /** * Called when error comes from RM communications as well as from errors in @@ -302,6 +347,6 @@ public void waitFor(Supplier check, int checkEveryMillis, * * @param e */ - public void onError(Throwable e); + void onError(Throwable e); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index addc3b6..948953f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -177,6 +177,18 @@ public void removeContainerRequest(T req) { client.removeContainerRequest(req); } + @Override + public void requestContainerResourceIncrease( + ContainerId containerId, Resource capability) { + client.requestContainerResourceIncrease(containerId, capability); + } + + @Override + public void requestContainerResourceDecrease( + ContainerId containerId, Resource capability) { + client.requestContainerResourceDecrease(containerId, capability); + } + /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. @@ -300,6 +312,19 @@ public void run() { handler.onContainersCompleted(completed); } + if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) { + List decreased = response.getDecreasedContainers(); + if (!decreased.isEmpty()) { + ((AMRMClientAsync.AbstractCallbackHandler) handler) + .onContainersResourceDecreased(decreased); + } + List increased = response.getIncreasedContainers(); + if (!increased.isEmpty()) { + ((AMRMClientAsync.AbstractCallbackHandler) handler) + .onContainersResourceIncreased(increased); + } + } + List allocated = response.getAllocatedContainers(); if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4cf9aa0..c48e3f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -49,7 +49,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -72,6 +74,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable @@ -110,8 +113,8 @@ containerRequests = new LinkedHashSet(); } } - - + + /** * Class compares Resource by memory then cpu in reverse order */ @@ -144,10 +147,7 @@ static boolean canFit(Resource arg0, Resource arg1) { int cpu0 = arg0.getVirtualCores(); int cpu1 = arg1.getVirtualCores(); - if(mem0 <= mem1 && cpu0 <= cpu1) { - return true; - } - return false; + return (mem0 <= mem1 && cpu0 <= cpu1); } //Key -> Priority @@ -164,11 +164,22 @@ static boolean canFit(Resource arg0, Resource arg1) { protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); protected final Set release = new TreeSet(); - // pendingRelease holds history or release requests.request is removed only if - // RM sends completedContainer. + // pendingRelease holds history of release requests. + // request is removed only if RM sends completedContainer. // How it different from release? --> release is for per allocate() request. protected Set pendingRelease = new TreeSet(); - + // increase and decrease maps hold container increase and decrease requests + // between two allocate() calls, and are cleared after each successful + // allocate() call. + protected final Map increase = new HashMap<>(); + protected final Map decrease = new HashMap<>(); + // pendingIncrease and pendingDecrease maps hold history of increase and + // decrease requests in case AM needs to reregister with the ResourceManager. + // Requests are removed from these maps only if RM confirms the increase or + // decrease through allocate response. + protected final Map pendingIncrease = new HashMap<>(); + protected final Map pendingDecrease = new HashMap<>(); + public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); } @@ -241,7 +252,9 @@ public AllocateResponse allocate(float progressIndicator) AllocateRequest allocateRequest = null; List blacklistToAdd = new ArrayList(); List blacklistToRemove = new ArrayList(); - + List increaseList = new ArrayList<>(); + List decreaseList = new ArrayList<>(); + try { synchronized (this) { askList = new ArrayList(ask.size()); @@ -252,10 +265,22 @@ public AllocateResponse allocate(float progressIndicator) r.getResourceName(), r.getCapability(), r.getNumContainers(), r.getRelaxLocality(), r.getNodeLabelExpression())); } + for (Map.Entry entry : decrease.entrySet()) { + decreaseList.add( + ContainerResourceChangeRequest.newInstance( + entry.getKey(), entry.getValue())); + } + for(Map.Entry entry : increase.entrySet()) { + increaseList.add( + ContainerResourceChangeRequest.newInstance( + entry.getKey(), entry.getValue())); + } releaseList = new ArrayList(release); // optimistically clear this collection assuming no RPC failure ask.clear(); release.clear(); + increase.clear(); + decrease.clear(); blacklistToAdd.addAll(blacklistAdditions); blacklistToRemove.addAll(blacklistRemovals); @@ -266,8 +291,9 @@ public AllocateResponse allocate(float progressIndicator) allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest); - // clear blacklistAdditions and blacklistRemovals before + askList, releaseList, blacklistRequest, + increaseList, decreaseList); + // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); blacklistRemovals.clear(); @@ -289,6 +315,8 @@ public AllocateResponse allocate(float progressIndicator) } } } + increase.putAll(this.pendingIncrease); + decrease.putAll(this.pendingDecrease); } // re register with RM registerApplicationMaster(); @@ -312,6 +340,16 @@ public AllocateResponse allocate(float progressIndicator) removePendingReleaseRequests(allocateResponse .getCompletedContainersStatuses()); } + if (!pendingIncrease.isEmpty() + && !allocateResponse.getIncreasedContainers().isEmpty()) { + removePendingChangeRequests( + allocateResponse.getDecreasedContainers(), true); + } + if (!pendingDecrease.isEmpty() + && !allocateResponse.getDecreasedContainers().isEmpty()) { + removePendingChangeRequests( + allocateResponse.getDecreasedContainers(), false); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -333,7 +371,34 @@ public AllocateResponse allocate(float progressIndicator) ask.add(oldAsk); } } - + // increase/decrease requests could have been added during the + // allocate call. Those are the newest requests which take precedence + // over requests cached in the increaseList and decreaseList. + // + // Only insert entries from the cached increaseList and decreaseList + // that do not exist in either of current decrease and increase maps: + // 1. If the cached increaseList contains the same container as that + // in the new increase map, then there is nothing to do as the + // the request in the new increase map has the latest value. + // 2. If the cached increaseList contains the same container as that + // in the new decrease map, then there is nothing to do either as + // the request in the new decrease map is newer and should cancel + // the old increase request. + // 3. The above also apply to the decreaseList. + for (ContainerResourceChangeRequest oldIncrease : increaseList) { + ContainerId oldContainerId = oldIncrease.getContainerId(); + if (increase.get(oldContainerId) == null + && decrease.get(oldContainerId) == null) { + increase.put(oldContainerId, oldIncrease.getCapability()); + } + } + for (ContainerResourceChangeRequest oldDecrease : decreaseList) { + ContainerId oldContainerId = oldDecrease.getContainerId(); + if (decrease.get(oldContainerId) == null + && increase.get(oldContainerId) == null) { + decrease.put(oldContainerId, oldDecrease.getCapability()); + } + } blacklistAdditions.addAll(blacklistToAdd); blacklistRemovals.addAll(blacklistToRemove); } @@ -349,6 +414,40 @@ protected void removePendingReleaseRequests( } } + protected void removePendingChangeRequests( + List changedContainers, boolean isIncrease) { + for (Container changedContainer : changedContainers) { + ContainerId containerId = changedContainer.getId(); + Resource targetCapability; + if (isIncrease) { + targetCapability = pendingIncrease.get(containerId); + } else { + targetCapability = pendingDecrease.get(containerId); + } + if (targetCapability == null) { + continue; + } + Resource changedCapability = changedContainer.getResource(); + if (isIncrease) { + if (Resources.fitsIn(targetCapability, changedCapability)) { + LOG.debug("RM has confirmed increased resource allocation for " + + "container " + containerId + ". Current resource allocation:" + + changedCapability + ". Remove pending increase request:" + + targetCapability); + pendingIncrease.remove(containerId); + } + } else { + if (Resources.fitsIn(changedCapability, targetCapability)) { + LOG.debug("RM has confirmed decreased resource allocation for " + + "container " + containerId + ". Current resource allocation:" + + changedCapability + ". Remove pending decrease request:" + + targetCapability); + pendingDecrease.remove(containerId); + } + } + } + } + @Private @VisibleForTesting protected void populateNMTokens(List nmTokens) { @@ -480,6 +579,26 @@ public synchronized void removeContainerRequest(T req) { } @Override + public synchronized void requestContainerResourceIncrease( + ContainerId containerId, Resource capability) { + validateContainerResourceChangeRequest(containerId, capability); + pendingDecrease.remove(containerId); + decrease.remove(containerId); + pendingIncrease.put(containerId, capability); + increase.put(containerId, capability); + } + + @Override + public synchronized void requestContainerResourceDecrease( + ContainerId containerId, Resource capability) { + validateContainerResourceChangeRequest(containerId, capability); + pendingIncrease.remove(containerId); + increase.remove(containerId); + pendingDecrease.put(containerId, capability); + decrease.put(containerId, capability); + } + + @Override public synchronized void releaseAssignedContainer(ContainerId containerId) { Preconditions.checkArgument(containerId != null, "ContainerId can not be null."); @@ -618,7 +737,18 @@ private void checkNodeLabelExpression(T containerRequest) { "Cannot specify node label with rack and node"); } } - + + private void validateContainerResourceChangeRequest( + ContainerId containerId, Resource capability) { + Preconditions.checkArgument(containerId != null, + "ContainerId cannot be null"); + Preconditions.checkArgument(capability != null, + "Target resource capability cannot be null"); + Preconditions.checkArgument(!Resources.equals(Resources.none(), capability) + && Resources.fitsIn(Resources.none(), capability), + "Target resource capability must be greater than 0"); + } + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. // A ResourceRequest is removed from the remoteRequestTable. A 0 container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 74d4aa4..54aa451 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -74,12 +74,18 @@ public void testAMRMClientAsync() throws Exception { List completed1 = Arrays.asList( ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), ContainerState.COMPLETE, "", 0)); - List allocated1 = Arrays.asList( + List containers = Arrays.asList( Container.newInstance(null, null, null, null, null, null)); final AllocateResponse response1 = createAllocateResponse( - new ArrayList(), allocated1, null); + new ArrayList(), containers, null); final AllocateResponse response2 = createAllocateResponse(completed1, new ArrayList(), null); + final AllocateResponse response3 = createAllocateResponse( + new ArrayList(), new ArrayList(), + containers, null, null); + final AllocateResponse response4 = createAllocateResponse( + new ArrayList(), new ArrayList(), + null, containers, null); final AllocateResponse emptyResponse = createAllocateResponse( new ArrayList(), new ArrayList(), null); @@ -99,7 +105,7 @@ public AllocateResponse answer(InvocationOnMock invocation) secondHeartbeatSync.incrementAndGet(); return response2; } - }).thenReturn(emptyResponse); + }).thenReturn(response3).thenReturn(response4).thenReturn(emptyResponse); when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) .thenReturn(null); when(client.getAvailableResources()).thenAnswer(new Answer() { @@ -146,12 +152,20 @@ public Resource answer(InvocationOnMock invocation) Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); Thread.sleep(10); } - + // wait for the completed containers from the second heartbeat's response while (callbackHandler.takeCompletedContainers() == null) { Thread.sleep(10); } - + + while (callbackHandler.takeIncreasedContainers() == null) { + Thread.sleep(10); + } + + while (callbackHandler.takeDecreasedContainers() == null) { + Thread.sleep(10); + } + asyncClient.stop(); Assert.assertEquals(null, callbackHandler.takeAllocatedContainers()); @@ -397,6 +411,17 @@ private AllocateResponse createAllocateResponse( return response; } + private AllocateResponse createAllocateResponse( + List completed, List allocated, + List increased, List decreased, + List nmTokens) { + AllocateResponse response = + AllocateResponse.newInstance(0, completed, allocated, + new ArrayList(), null, null, 1, null, nmTokens, + increased, decreased); + return response; + } + public static ContainerId newContainerId(int appId, int appAttemptId, long timestamp, int containerId) { ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId); @@ -405,9 +430,12 @@ public static ContainerId newContainerId(int appId, int appAttemptId, return ContainerId.newContainerId(applicationAttemptId, containerId); } - private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler { + private class TestCallbackHandler + extends AMRMClientAsync.AbstractCallbackHandler { private volatile List completedContainers; private volatile List allocatedContainers; + private volatile List increasedContainers; + private volatile List decreasedContainers; Exception savedException = null; volatile boolean reboot = false; Object notifier = new Object(); @@ -425,7 +453,31 @@ public static ContainerId newContainerId(int appId, int appAttemptId, } return ret; } - + + public List takeDecreasedContainers() { + List ret = decreasedContainers; + if (ret == null) { + return null; + } + decreasedContainers = null; + synchronized (ret) { + ret.notify(); + } + return ret; + } + + public List takeIncreasedContainers() { + List ret = increasedContainers; + if (ret == null) { + return null; + } + increasedContainers = null; + synchronized (ret) { + ret.notify(); + } + return ret; + } + public List takeAllocatedContainers() { List ret = allocatedContainers; if (ret == null) { @@ -454,6 +506,34 @@ public void onContainersCompleted(List statuses) { } @Override + public void onContainersResourceDecreased(List containers) { + decreasedContainers = containers; + synchronized (decreasedContainers) { + while (decreasedContainers != null) { + try { + decreasedContainers.wait(); + } catch (InterruptedException ex) { + LOG.error("Interrupted during wait", ex); + } + } + } + } + + @Override + public void onContainersResourceIncreased(List containers) { + increasedContainers = containers; + synchronized (increasedContainers) { + while (increasedContainers != null) { + try { + increasedContainers.wait(); + } catch (InterruptedException ex) { + LOG.error("Interrupted during wait", ex); + } + } + } + } + + @Override public void onContainersAllocated(List containers) { allocatedContainers = containers; // wait for containers to be taken before returning diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 7d29d05..6e00a1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -39,6 +39,7 @@ import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; @@ -73,6 +74,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; +import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -126,6 +128,8 @@ public static void setup() throws Exception { rolling_interval_sec); conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms); conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + // set the minimum allocation so that resource decrease can go under 1024 + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); yarnCluster.init(conf); @@ -730,8 +734,156 @@ public void testAskWithInvalidNodeLabels() { new ContainerRequest(Resource.newInstance(1024, 1), null, null, Priority.UNDEFINED, true, "x && y")); } - - private void testAllocation(final AMRMClientImpl amClient) + + @Test(timeout=60000) + public void testAMRMClientWithContainerResourceChange() + throws YarnException, IOException { + AMRMClient amClient = null; + try { + // start am rm client + amClient = AMRMClient.createAMRMClient(); + Assert.assertNotNull(amClient); + // asserting we are using the singleton instance cache + Assert.assertSame( + NMTokenCache.getSingleton(), amClient.getNMTokenCache()); + amClient.init(conf); + amClient.start(); + assertEquals(STATE.STARTED, amClient.getServiceState()); + // start am nm client + NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); + Assert.assertNotNull(nmClient); + // asserting we are using the singleton instance cache + Assert.assertSame( + NMTokenCache.getSingleton(), nmClient.getNMTokenCache()); + nmClient.init(conf); + nmClient.start(); + assertEquals(STATE.STARTED, nmClient.getServiceState()); + // am rm client register the application master with RM + amClient.registerApplicationMaster("Host", 10000, ""); + // allocate two containers and make sure they are in RUNNING state + List containers = + allocateAndStartContainers(amClient, nmClient, 2); + // perform container resource increase and decrease tests + doContainerResourceChange(amClient, containers); + // unregister and finish up the test + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } + + private List allocateAndStartContainers( + final AMRMClient amClient, final NMClient nmClient, + int num) throws YarnException, IOException { + // set up allocation requests + for (int i = 0; i < num; ++i) { + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority)); + } + // send allocation requests + amClient.allocate(0.1f); + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(150); + // get allocations + AllocateResponse allocResponse = amClient.allocate(0.1f); + List containers = allocResponse.getAllocatedContainers(); + Assert.assertEquals(num, containers.size()); + // build container launch context + Credentials ts = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + // start a process long enough for increase/decrease action to take effect + ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext( + Collections.emptyMap(), + new HashMap(), Arrays.asList("sleep", "100"), + new HashMap(), securityTokens, + new HashMap()); + // start the containers and make sure they are in RUNNING state + try { + for (int i = 0; i < num; i++) { + Container container = containers.get(i); + nmClient.startContainer(container, clc); + // NodeManager may still need some time to get the stable + // container status + while (true) { + ContainerStatus status = nmClient.getContainerStatus( + container.getId(), container.getNodeId()); + if (status.getState() == ContainerState.RUNNING) { + break; + } + sleep(100); + } + } + } catch (YarnException e) { + throw new AssertionError("Exception is not expected: " + e); + } + // sleep to let NM's heartbeat to RM to confirm container launch + sleep(200); + return containers; + } + + + private void doContainerResourceChange( + final AMRMClient amClient, List containers) + throws YarnException, IOException { + Assert.assertEquals(2, containers.size()); + // remember the container IDs + ContainerId container1 = containers.get(0).getId(); + ContainerId container2 = containers.get(1).getId(); + AMRMClientImpl amClientImpl = (AMRMClientImpl) amClient; + Assert.assertNotEquals(container2, container1); + Assert.assertEquals(0, amClientImpl.increase.size()); + Assert.assertEquals(0, amClientImpl.decrease.size()); + // verify newer request overwrites older request for the container1 + amClientImpl.requestContainerResourceIncrease( + container1, Resource.newInstance(2048, 1)); + amClientImpl.requestContainerResourceIncrease( + container1, Resource.newInstance(4096, 1)); + Assert.assertEquals( + Resource.newInstance(4096, 1), amClientImpl.increase.get(container1)); + // verify new decrease request cancels old increase request for container1 + amClientImpl.requestContainerResourceDecrease( + container1, Resource.newInstance(512, 1)); + Assert.assertTrue(amClientImpl.increase.get(container1) == null); + Assert.assertEquals( + Resource.newInstance(512, 1), amClientImpl.decrease.get(container1)); + // request resource increase for container2 + amClientImpl.requestContainerResourceIncrease( + container2, Resource.newInstance(2048, 1)); + Assert.assertTrue(amClientImpl.decrease.get(container2) == null); + Assert.assertEquals( + Resource.newInstance(2048, 1), amClientImpl.increase.get(container2)); + // as of now: container1 asks to decrease to (512, 1) + // container2 asks to increase to (2048, 1) + // send allocation requests + AllocateResponse allocResponse = amClient.allocate(0.1f); + Assert.assertEquals(0, amClientImpl.increase.size()); + Assert.assertEquals(0, amClientImpl.decrease.size()); + // we should get decrease confirmation right away + List decreasedContainers = + allocResponse.getDecreasedContainers(); + List increasedContainers = + allocResponse.getIncreasedContainers(); + Assert.assertEquals(1, decreasedContainers.size()); + Assert.assertEquals(0, increasedContainers.size()); + // we should get increase allocation after the next NM's heartbeat to RM + sleep(150); + // get allocations + allocResponse = amClient.allocate(0.1f); + decreasedContainers = + allocResponse.getDecreasedContainers(); + increasedContainers = + allocResponse.getIncreasedContainers(); + Assert.assertEquals(1, increasedContainers.size()); + Assert.assertEquals(0, decreasedContainers.size()); + } + + private void testAllocation(final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request