diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 08b911b..47270f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -87,6 +87,22 @@ protected NMClient(String name) { throws YarnException, IOException; /** + *

Increase the resource of a container.

+ * + *

The ApplicationMaster or other applications that use the + * client must provide the details of the container, including the Id and + * the target resource encapsulated in the updated container token via + * {@link Container}. + *

+ * + * @param container the container with updated token + * @throws YarnException + * @throws IOException + */ + public abstract void increaseContainerResource(Container container) + throws YarnException, IOException; + + /** *

Stop an started container.

* * @param containerId the Id of the started container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 5cb504d..e103787 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -136,6 +137,8 @@ protected NMClientAsync(String name, NMClient client, public abstract void startContainerAsync( Container container, ContainerLaunchContext containerLaunchContext); + public abstract void increaseContainerResourceAsync(Container container); + public abstract void stopContainerAsync( ContainerId containerId, NodeId nodeId); @@ -160,6 +163,33 @@ public void setCallbackHandler(CallbackHandler callbackHandler) { /** *

+ * An abstract callback class which implements the CallbackHandler interface + * and defines additional methods. + *

+ */ + public abstract static class AbstractCallbackHandler + implements CallbackHandler { + /** + * The API is called when NodeManager responds to indicate + * the container resource has been successfully increased. + * @param containerId the Id of the container + * @param resource the target resource of the container + */ + public abstract void onContainerResourceIncreased( + ContainerId containerId, Resource resource); + + /** + * The API is called when an exception is raised in the process of + * increasing container resource. + * @param containerId the Id of the container + * @param t the raised exception + */ + public abstract void onIncreaseContainerResourceError( + ContainerId containerId, Throwable t); + } + + /** + *

* The callback interface needs to be implemented by {@link NMClientAsync} * users. The APIs are called when responses from NodeManager are * available. @@ -178,7 +208,7 @@ public void setCallbackHandler(CallbackHandler callbackHandler) { * catch, log and then ignore it. *

*/ - public static interface CallbackHandler { + public interface CallbackHandler { /** * The API is called when NodeManager responds to indicate its * acceptance of the starting container request diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 39682df..a0ad8ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -229,6 +229,29 @@ public void startContainerAsync( } } + public void increaseContainerResourceAsync(Container container) { + if (!(callbackHandler instanceof AbstractCallbackHandler)) { + LOG.error("Callback handler does not implement container resource " + + "increase callback methods"); + return; + } + AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler; + if (containers.get(container.getId()) == null) { + handler.onIncreaseContainerResourceError( + container.getId(), + RPCUtil.getRemoteException( + "Container " + container.getId() + + " is neither started nor scheduled to start")); + } + try { + events.put(new IncreaseContainerResourceEvent(container)); + } catch (InterruptedException e) { + LOG.warn("Exception when scheduling the event of increasing resource of " + + "Container " + container.getId()); + handler.onIncreaseContainerResourceError(container.getId(), e); + } + } + public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { if (containers.get(containerId) == null) { callbackHandler.onStopContainerError(containerId, @@ -276,7 +299,8 @@ protected ContainerEventProcessor getContainerEventProcessor( protected static enum ContainerEventType { START_CONTAINER, STOP_CONTAINER, - QUERY_CONTAINER + QUERY_CONTAINER, + INCREASE_CONTAINER_RESOURCE } protected static class ContainerEvent @@ -327,6 +351,21 @@ public ContainerLaunchContext getContainerLaunchContext() { } } + protected static class IncreaseContainerResourceEvent extends ContainerEvent { + private Container container; + + public IncreaseContainerResourceEvent(Container container) { + super(container.getId(), container.getNodeId(), + container.getContainerToken(), + ContainerEventType.INCREASE_CONTAINER_RESOURCE); + this.container = container; + } + + public Container getContainer() { + return container; + } + } + protected static class StatefulContainer implements EventHandler { @@ -344,7 +383,9 @@ public ContainerLaunchContext getContainerLaunchContext() { ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition()) // Transitions from RUNNING state - // RUNNING -> RUNNING should be the invalid transition + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.INCREASE_CONTAINER_RESOURCE, + new IncreaseContainerResourceTransition()) .addTransition(ContainerState.RUNNING, EnumSet.of(ContainerState.DONE, ContainerState.FAILED), ContainerEventType.STOP_CONTAINER, @@ -410,6 +451,52 @@ private ContainerState onExceptionRaised(StatefulContainer container, } } + protected static class IncreaseContainerResourceTransition implements + SingleArcTransition { + @Override + public void transition( + StatefulContainer container, ContainerEvent event) { + if (!(container.nmClientAsync.getCallbackHandler() + instanceof AbstractCallbackHandler)) { + LOG.error("Callback handler does not implement container resource " + + "increase callback methods"); + return; + } + AbstractCallbackHandler handler = + (AbstractCallbackHandler) container.nmClientAsync + .getCallbackHandler(); + try { + if (!(event instanceof IncreaseContainerResourceEvent)) { + throw new AssertionError("Unexpected event type. Expecting:" + + "IncreaseContainerResourceEvent. Got:" + event); + } + IncreaseContainerResourceEvent increaseEvent = + (IncreaseContainerResourceEvent) event; + container.nmClientAsync.getClient().increaseContainerResource( + increaseEvent.getContainer()); + try { + handler.onContainerResourceIncreased( + increaseEvent.getContainerId(), increaseEvent.getContainer() + .getResource()); + } catch (Throwable thr) { + // Don't process user created unchecked exception + LOG.info("Unchecked exception is thrown from " + + "onContainerResourceIncreased for Container " + + event.getContainerId(), thr); + } + } catch (Exception e) { + try { + handler.onIncreaseContainerResourceError(event.getContainerId(), e); + } catch (Throwable thr) { + // Don't process user created unchecked exception + LOG.info("Unchecked exception is thrown from " + + "onIncreaseContainerResourceError for Container " + + event.getContainerId(), thr); + } + } + } + } + protected static class StopContainerTransition implements MultipleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 3518f35..36d6e71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -35,6 +35,8 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -147,8 +149,7 @@ public void cleanupRunningContainersOnStop(boolean enabled) { private ContainerState state; - public StartedContainer(ContainerId containerId, NodeId nodeId, - Token containerToken) { + public StartedContainer(ContainerId containerId, NodeId nodeId) { this.containerId = containerId; this.nodeId = nodeId; state = ContainerState.NEW; @@ -232,6 +233,35 @@ private void addStartingContainer(StartedContainer startedContainer) } @Override + public void increaseContainerResource(Container container) + throws YarnException, IOException { + ContainerManagementProtocolProxyData proxy = null; + try { + proxy = + cmProxy.getProxy(container.getNodeId().toString(), + container.getId()); + List increaseTokens = new ArrayList<>(); + increaseTokens.add(container.getContainerToken()); + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest + .newInstance(increaseTokens); + IncreaseContainersResourceResponse response = + proxy.getContainerManagementProtocol() + .increaseContainersResource(increaseRequest); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(container.getId())) { + Throwable t = response.getFailedRequests().get(container.getId()) + .deSerialize(); + parseAndThrowException(t); + } + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + } + + @Override public void stopContainer(ContainerId containerId, NodeId nodeId) throws YarnException, IOException { StartedContainer startedContainer = getStartedContainer(containerId); @@ -308,7 +338,7 @@ private void stopContainerInternal(ContainerId containerId, NodeId nodeId) protected synchronized StartedContainer createStartedContainer( Container container) throws YarnException, IOException { StartedContainer startedContainer = new StartedContainer(container.getId(), - container.getNodeId(), container.getContainerToken()); + container.getNodeId()); return startedContainer; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java index 6f9d41d..cd5d970 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import org.apache.hadoop.yarn.api.records.Resource; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; -import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -117,6 +117,10 @@ public void testNMClientAsync() throws Exception { asyncClient.startContainerAsync(container, clc); } while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) + .isIncreaseResourceFailureCallsExecuted()) { + Thread.sleep(10); + } + while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) .isStopFailureCallsExecuted()) { Thread.sleep(10); } @@ -183,7 +187,7 @@ protected ContainerEventProcessor getContainerEventProcessor( } private class TestCallbackHandler1 - implements NMClientAsync.CallbackHandler { + extends NMClientAsync.AbstractCallbackHandler { private boolean path = true; @@ -196,6 +200,10 @@ protected ContainerEventProcessor getContainerEventProcessor( private AtomicInteger actualQueryFailure = new AtomicInteger(0); private AtomicInteger actualStopSuccess = new AtomicInteger(0); private AtomicInteger actualStopFailure = new AtomicInteger(0); + private AtomicInteger actualIncreaseResourceSuccess = + new AtomicInteger(0); + private AtomicInteger actualIncreaseResourceFailure = + new AtomicInteger(0); private AtomicIntegerArray actualStartSuccessArray; private AtomicIntegerArray actualStartFailureArray; @@ -203,6 +211,8 @@ protected ContainerEventProcessor getContainerEventProcessor( private AtomicIntegerArray actualQueryFailureArray; private AtomicIntegerArray actualStopSuccessArray; private AtomicIntegerArray actualStopFailureArray; + private AtomicIntegerArray actualIncreaseResourceSuccessArray; + private AtomicIntegerArray actualIncreaseResourceFailureArray; private Set errorMsgs = Collections.synchronizedSet(new HashSet()); @@ -217,6 +227,10 @@ public TestCallbackHandler1(int expectedSuccess, int expectedFailure) { actualQueryFailureArray = new AtomicIntegerArray(expectedFailure); actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess); actualStopFailureArray = new AtomicIntegerArray(expectedFailure); + actualIncreaseResourceSuccessArray = + new AtomicIntegerArray(expectedSuccess); + actualIncreaseResourceFailureArray = + new AtomicIntegerArray(expectedFailure); } @SuppressWarnings("deprecation") @@ -236,7 +250,11 @@ public void onContainerStarted(ContainerId containerId, asyncClient.getContainerStatusAsync(containerId, nodeId); } else { // move on to the following failure tests - asyncClient.stopContainerAsync(containerId, nodeId); + // make sure we pass in the container with the same + // containerId + Container container = Container.newInstance( + containerId, nodeId, null, null, null, containerToken); + asyncClient.increaseContainerResourceAsync(container); } // Shouldn't crash the test thread @@ -255,7 +273,11 @@ public void onContainerStatusReceived(ContainerId containerId, actualQuerySuccess.addAndGet(1); actualQuerySuccessArray.set(containerId.getId(), 1); // move on to the following success tests - asyncClient.stopContainerAsync(containerId, nodeId); + // make sure we pass in the container with the same + // containerId + Container container = Container.newInstance( + containerId, nodeId, null, null, null, containerToken); + asyncClient.increaseContainerResourceAsync(container); // Shouldn't crash the test thread throw new RuntimeException("Ignorable Exception"); @@ -263,6 +285,23 @@ public void onContainerStatusReceived(ContainerId containerId, @SuppressWarnings("deprecation") @Override + public void onContainerResourceIncreased( + ContainerId containerId, Resource resource) { + if (containerId.getId() >= expectedSuccess) { + errorMsgs.add("Container " + containerId + + " should throw the exception onContainerResourceIncreased"); + return; + } + actualIncreaseResourceSuccess.addAndGet(1); + actualIncreaseResourceSuccessArray.set(containerId.getId(), 1); + // move on to the following success tests + asyncClient.stopContainerAsync(containerId, nodeId); + // throw a fake user exception, and shouldn't crash the test + throw new RuntimeException("Ignorable Exception"); + } + + @SuppressWarnings("deprecation") + @Override public void onContainerStopped(ContainerId containerId) { if (containerId.getId() >= expectedSuccess) { errorMsgs.add("Container " + containerId + @@ -302,6 +341,26 @@ public void onStartContainerError(ContainerId containerId, Throwable t) { @SuppressWarnings("deprecation") @Override + public void onIncreaseContainerResourceError( + ContainerId containerId, Throwable t) { + if (containerId.getId() < expectedSuccess + expectedFailure) { + errorMsgs.add("Container " + containerId + + " shouldn't throw the exception onIncreaseContainerResourceError"); + return; + } + actualIncreaseResourceFailure.addAndGet(1); + actualIncreaseResourceFailureArray.set( + containerId.getId() - expectedSuccess - expectedFailure, 1); + // increase container resource error should NOT change the + // the container status to FAILED + // move on to the following failure tests + asyncClient.stopContainerAsync(containerId, nodeId); + // Shouldn't crash the test thread + throw new RuntimeException("Ignorable Exception"); + } + + @SuppressWarnings("deprecation") + @Override public void onStopContainerError(ContainerId containerId, Throwable t) { if (t instanceof RuntimeException) { errorMsgs.add("Unexpected throwable from callback functions should be" + @@ -345,10 +404,12 @@ public boolean isAllSuccessCallsExecuted() { boolean isAllSuccessCallsExecuted = actualStartSuccess.get() == expectedSuccess && actualQuerySuccess.get() == expectedSuccess && + actualIncreaseResourceSuccess.get() == expectedSuccess && actualStopSuccess.get() == expectedSuccess; if (isAllSuccessCallsExecuted) { assertAtomicIntegerArray(actualStartSuccessArray); assertAtomicIntegerArray(actualQuerySuccessArray); + assertAtomicIntegerArray(actualIncreaseResourceSuccessArray); assertAtomicIntegerArray(actualStopSuccessArray); } return isAllSuccessCallsExecuted; @@ -365,6 +426,15 @@ public boolean isStartAndQueryFailureCallsExecuted() { return isStartAndQueryFailureCallsExecuted; } + public boolean isIncreaseResourceFailureCallsExecuted() { + boolean isIncreaseResourceFailureCallsExecuted = + actualIncreaseResourceFailure.get() == expectedFailure; + if (isIncreaseResourceFailureCallsExecuted) { + assertAtomicIntegerArray(actualIncreaseResourceFailureArray); + } + return isIncreaseResourceFailureCallsExecuted; + } + public boolean isStopFailureCallsExecuted() { boolean isStopFailureCallsExecuted = actualStopFailure.get() == expectedFailure; @@ -392,6 +462,8 @@ private NMClient mockNMClient(int mode) when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class))).thenReturn( recordFactory.newRecordInstance(ContainerStatus.class)); + doNothing().when(client).increaseContainerResource( + any(Container.class)); doNothing().when(client).stopContainer(any(ContainerId.class), any(NodeId.class)); break; @@ -411,6 +483,8 @@ private NMClient mockNMClient(int mode) when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class))).thenReturn( recordFactory.newRecordInstance(ContainerStatus.class)); + doThrow(RPCUtil.getRemoteException("Increase Resource Exception")) + .when(client).increaseContainerResource(any(Container.class)); doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client) .stopContainer(any(ContainerId.class), any(NodeId.class)); } @@ -539,7 +613,6 @@ public void onGetContainerStatusError(ContainerId containerId, @Override public void onStopContainerError(ContainerId containerId, Throwable t) { } - } private Container mockContainer(int i) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 0d4a271..cd04130 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -210,10 +210,10 @@ public void testNMClientNoCleanupOnStop() testContainerManagement(nmClient, allocateContainers(rmClient, 5)); rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); + null, null); // don't stop the running containers stopNmClient(false); - assertFalse(nmClient.startedContainers. isEmpty()); + assertFalse(nmClient.startedContainers.isEmpty()); //now cleanup nmClient.cleanupRunningContainers(); assertEquals(0, nmClient.startedContainers.size()); @@ -298,6 +298,16 @@ private void testContainerManagement(NMClientImpl nmClient, e.getMessage().contains("is not handled by this NodeManager")); } + // increaseContainerResource shouldn't be called before startContainer, + // otherwise, NodeManager cannot find the container + try { + nmClient.increaseContainerResource(container); + fail("Exception is expected"); + } catch (YarnException e) { + assertTrue("The thrown exception is not expected", + e.getMessage().contains("is not handled by this NodeManager")); + } + // stopContainer shouldn't be called before startContainer, // otherwise, an exception will be thrown try { @@ -332,6 +342,8 @@ private void testContainerManagement(NMClientImpl nmClient, // NodeManager may still need some time to make the container started testGetContainerStatus(container, i, ContainerState.RUNNING, "", Arrays.asList(new Integer[] {-1000})); + // Test increase container API and make sure requests can reach NM + testIncreaseContainerResource(container); try { nmClient.stopContainer(container.getId(), container.getNodeId()); @@ -397,4 +409,19 @@ private void testGetContainerStatus(Container container, int index, } } + private void testIncreaseContainerResource(Container container) + throws YarnException, IOException { + try { + nmClient.increaseContainerResource(container); + } catch (YarnException e) { + // NM container will only be in LOCALIZED state, so expect the increase + // action to fail. + if (!e.getMessage().contains( + "can only be changed when a container is in RUNNING state")) { + throw (AssertionError) + (new AssertionError("Exception is not expected: " + e) + .initCause(e)); + } + } + } }