diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/ApplicationMaster.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/ApplicationMaster.java index 015a5a692a0..82572050a65 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/ApplicationMaster.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/ApplicationMaster.java @@ -692,6 +692,12 @@ public void onUpdateContainerResourceError(ContainerId containerId, Throwable t) { LOG.info("onUpdateContainerResourceError: {}", containerId, t); } + + @Override + public void onResyncContainerError(ContainerId containerId, + Throwable t) { + LOG.info("onResyncContainerError: {}", containerId, t); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index bb300db26d2..4b7d6114976 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -1396,6 +1396,10 @@ public void onUpdateContainerResourceError( public void onContainerResourceUpdated(ContainerId containerId, Resource resource) { } + + @Override + public void onResyncContainerError(ContainerId containerId, Throwable t) { + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 458a7a1c5c1..d1dd4921aef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -885,6 +885,11 @@ public void onUpdateContainerResourceError(ContainerId containerId, public void onStopContainerError(ContainerId containerId, Throwable t) { } + + @Override + public void onResyncContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to resync " + containerId, t); + } } public ServiceMetrics getServiceMetrics() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index cbc489c4e69..d34d0726ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -464,6 +464,7 @@ public void transition(Component component, ComponentEvent event) { component.getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId(), component.pendingInstances.size()); + component.getScheduler().getNmClient().resyncContainerAsync(container); component.dispatcher.getEventHandler().handle( new ComponentInstanceEvent(container.getId(), START)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index a8b64cc6dd2..d528de1eaf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -281,4 +281,16 @@ public void localize(ContainerId containerId, NodeId nodeId, IOException { return null; } + + /** + *

Rsync with an started container.

+ * + * @param containerId the Id of the container to resync. + * @param nodeId the Id of the NodeManager + * + * @throws YarnException YarnException. + * @throws IOException IOException. + */ + public abstract void resyncContainer(ContainerId containerId, NodeId nodeId) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 62e2d993e4d..f5d8bd4a256 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -177,6 +177,8 @@ protected NMClientAsync(String name, NMClient client, public abstract void startContainerAsync( Container container, ContainerLaunchContext containerLaunchContext); + public abstract void resyncContainerAsync(Container container); + @Deprecated public abstract void increaseContainerResourceAsync(Container container); @@ -512,6 +514,15 @@ void onContainerStatusReceived(ContainerId containerId, */ void onStopContainerError(ContainerId containerId, Throwable t); + /** + * The API is called when an exception is raised in the process of + * resyncing a container + * + * @param containerId the Id of the container + * @param t the raised exception + */ + void onResyncContainerError(ContainerId containerId, Throwable t); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 9cd653bed4b..b3022a161a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -260,6 +260,21 @@ public void startContainerAsync( } } + public void resyncContainerAsync(Container container) { + if (containers.putIfAbsent(container.getId(), + new StatefulContainer(this, container.getId())) != null) { + LOG.info("Container " + container.getId() + " is already started"); + return; + } + try { + events.put(new ResyncContainerEvent(container)); + } catch (InterruptedException e) { + LOG.warn("Exception when scheduling the event of resyncing Container " + + container.getId()); + callbackHandler.onResyncContainerError(container.getId(), e); + } + } + @Deprecated public void increaseContainerResourceAsync(Container container) { if (!(callbackHandler instanceof AbstractCallbackHandler)) { @@ -457,7 +472,8 @@ protected ContainerEventProcessor getContainerEventProcessor( REINITIALIZE_CONTAINER, RESTART_CONTAINER, ROLLBACK_LAST_REINIT, - COMMIT_LAST_REINT + COMMIT_LAST_REINT, + RESYNC_CONTAINER } protected static class ContainerEvent @@ -508,6 +524,20 @@ public ContainerLaunchContext getContainerLaunchContext() { } } + protected static class ResyncContainerEvent extends ContainerEvent { + private Container container; + + public ResyncContainerEvent(Container container) { + super(container.getId(), container.getNodeId(), + container.getContainerToken(), ContainerEventType.RESYNC_CONTAINER); + this.container = container; + } + + public Container getContainer() { + return container; + } + } + protected static class ReInitializeContainerEvevnt extends ContainerEvent { private ContainerLaunchContext containerLaunchContext; private boolean autoCommit; @@ -565,6 +595,10 @@ public Container getContainer() { new StartContainerTransition()) .addTransition(ContainerState.PREP, ContainerState.DONE, ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition()) + .addTransition(ContainerState.PREP, + EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED), + ContainerEventType.RESYNC_CONTAINER, + new ResyncContainerTransition()) // Transitions from RUNNING state .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, @@ -660,6 +694,46 @@ private ContainerState onExceptionRaised(StatefulContainer container, } } + protected static class ResyncContainerTransition implements + MultipleArcTransition { + @Override + public ContainerState transition( + StatefulContainer container, ContainerEvent event) { + ContainerId containerId = event.getContainerId(); + ResyncContainerEvent scEvent = null; + if (event instanceof ResyncContainerEvent) { + scEvent = (ResyncContainerEvent) event; + } + assert scEvent != null; + try { + container.nmClientAsync.getClient().resyncContainer( + containerId, scEvent.getNodeId()); + return ContainerState.RUNNING; + } catch (YarnException e) { + return onExceptionRaised(container, event, e); + } catch (IOException e) { + return onExceptionRaised(container, event, e); + } catch (Throwable t) { + return onExceptionRaised(container, event, t); + } + } + + private ContainerState onExceptionRaised(StatefulContainer container, + ContainerEvent event, Throwable t) { + try { + container.nmClientAsync.getCallbackHandler().onResyncContainerError( + event.getContainerId(), t); + } catch (Throwable thr) { + // Don't process user created unchecked exception + LOG.info( + "Unchecked exception is thrown from onRsyncContainerError for " + + "Container " + event.getContainerId(), thr); + } + return ContainerState.FAILED; + } + } + protected static class UpdateContainerResourceTransition implements SingleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index fcc48e24974..f70010c7d56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -174,6 +174,20 @@ public NodeId getNodeId() { } } + public void resyncContainer(ContainerId containerId, NodeId nodeId) + throws YarnException, IOException { + ContainerStatus containerStatus = getContainerStatus(containerId, nodeId); + if (containerStatus.getState() != ContainerState.RUNNING) { + throw new YarnException("Container [" + containerId + "] state is " + + containerStatus.getState()); + } + StartedContainer startingContainer = new StartedContainer(containerId, + nodeId); + synchronized (startingContainer) { + addStartingContainer(startingContainer); + } + } + private void addStartingContainer(StartedContainer startedContainer) throws YarnException { if (startedContainers.putIfAbsent(startedContainer.containerId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java index 40038b1cabe..bae6da297aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java @@ -628,6 +628,10 @@ public void onGetContainerStatusError(ContainerId containerId, throw new RuntimeException("Ignorable Exception"); } + @Override + public void onResyncContainerError(ContainerId containerId, Throwable t) { + } + public boolean isAllSuccessCallsExecuted() { boolean isAllSuccessCallsExecuted = testMap.get(OpsToTest.START).success.get() == expectedSuccess && @@ -906,6 +910,10 @@ public void onUpdateContainerResourceError(ContainerId containerId, @Override public void onStopContainerError(ContainerId containerId, Throwable t) { } + + @Override + public void onResyncContainerError(ContainerId containerId, Throwable t) { + } } private Container mockContainer(int i) {