diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 458a7a1c5c1..d1dd4921aef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -885,6 +885,11 @@ public void onUpdateContainerResourceError(ContainerId containerId, public void onStopContainerError(ContainerId containerId, Throwable t) { } + + @Override + public void onResyncContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to resync " + containerId, t); + } } public ServiceMetrics getServiceMetrics() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index cbc489c4e69..d34d0726ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -464,6 +464,7 @@ public void transition(Component component, ComponentEvent event) { component.getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId(), component.pendingInstances.size()); + component.getScheduler().getNmClient().resyncContainerAsync(container); component.dispatcher.getEventHandler().handle( new ComponentInstanceEvent(container.getId(), START)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index a8b64cc6dd2..64865d83f64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -281,4 +281,16 @@ public void localize(ContainerId containerId, NodeId nodeId, IOException { return null; } + + /** + *

Resync with an started container.

+ * + * @param containerId the Id of the container to resync. + * @param nodeId the Id of the NodeManager + * + * @throws YarnException YarnException. + * @throws IOException IOException. + */ + public abstract void resyncContainer(ContainerId containerId, NodeId nodeId) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java index 62e2d993e4d..b837b1dc8db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java @@ -177,6 +177,8 @@ protected NMClientAsync(String name, NMClient client, public abstract void startContainerAsync( Container container, ContainerLaunchContext containerLaunchContext); + public abstract void resyncContainerAsync(Container container); + @Deprecated public abstract void increaseContainerResourceAsync(Container container); @@ -433,6 +435,14 @@ public void onRollbackLastReInitializationError(ContainerId containerId, */ public void onCommitLastReInitializationError(ContainerId containerId, Throwable t) {} + + /** + * Error Callback for container resync. + * + * @param containerId the Id of the container to resync + * @param t a Throwable. + */ + public void onResyncContainerError(ContainerId containerId, Throwable t) {} } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 9cd653bed4b..d85c648e017 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -260,6 +260,27 @@ public void startContainerAsync( } } + public void resyncContainerAsync(Container container) { + if (!(callbackHandler instanceof AbstractCallbackHandler)) { + LOG.error("Callback handler does not implement container resync " + + "callback methods"); + return; + } + AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler; + if (containers.putIfAbsent(container.getId(), + new StatefulContainer(this, container.getId())) != null) { + LOG.info("Container " + container.getId() + " is already started"); + return; + } + try { + events.put(new ResyncContainerEvent(container)); + } catch (InterruptedException e) { + LOG.warn("Exception when scheduling the event of resyncing Container " + + container.getId()); + handler.onResyncContainerError(container.getId(), e); + } + } + @Deprecated public void increaseContainerResourceAsync(Container container) { if (!(callbackHandler instanceof AbstractCallbackHandler)) { @@ -457,7 +478,8 @@ protected ContainerEventProcessor getContainerEventProcessor( REINITIALIZE_CONTAINER, RESTART_CONTAINER, ROLLBACK_LAST_REINIT, - COMMIT_LAST_REINT + COMMIT_LAST_REINT, + RESYNC_CONTAINER } protected static class ContainerEvent @@ -508,6 +530,20 @@ public ContainerLaunchContext getContainerLaunchContext() { } } + protected static class ResyncContainerEvent extends ContainerEvent { + private Container container; + + public ResyncContainerEvent(Container container) { + super(container.getId(), container.getNodeId(), + container.getContainerToken(), ContainerEventType.RESYNC_CONTAINER); + this.container = container; + } + + public Container getContainer() { + return container; + } + } + protected static class ReInitializeContainerEvevnt extends ContainerEvent { private ContainerLaunchContext containerLaunchContext; private boolean autoCommit; @@ -565,6 +601,10 @@ public Container getContainer() { new StartContainerTransition()) .addTransition(ContainerState.PREP, ContainerState.DONE, ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition()) + .addTransition(ContainerState.PREP, + EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED), + ContainerEventType.RESYNC_CONTAINER, + new ResyncContainerTransition()) // Transitions from RUNNING state .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, @@ -660,6 +700,30 @@ private ContainerState onExceptionRaised(StatefulContainer container, } } + protected static class ResyncContainerTransition implements + MultipleArcTransition { + @Override + public ContainerState transition( + StatefulContainer container, ContainerEvent containerEvent) { + ContainerId containerId = containerEvent.getContainerId(); + AbstractCallbackHandler handler = (AbstractCallbackHandler) container + .nmClientAsync.getCallbackHandler(); + try { + if (!(containerEvent instanceof ResyncContainerEvent)) { + LOG.error("Unexpected Event.. [" + containerEvent.getType() + "]"); + return ContainerState.FAILED; + } + container.nmClientAsync.getClient().resyncContainer( + containerId, containerEvent.getNodeId()); + } catch (Throwable t) { + handler.onResyncContainerError(containerId, t); + return ContainerState.FAILED; + } + return ContainerState.RUNNING; + } + } + protected static class UpdateContainerResourceTransition implements SingleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index fcc48e24974..f70010c7d56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -174,6 +174,20 @@ public NodeId getNodeId() { } } + public void resyncContainer(ContainerId containerId, NodeId nodeId) + throws YarnException, IOException { + ContainerStatus containerStatus = getContainerStatus(containerId, nodeId); + if (containerStatus.getState() != ContainerState.RUNNING) { + throw new YarnException("Container [" + containerId + "] state is " + + containerStatus.getState()); + } + StartedContainer startingContainer = new StartedContainer(containerId, + nodeId); + synchronized (startingContainer) { + addStartingContainer(startingContainer); + } + } + private void addStartingContainer(StartedContainer startedContainer) throws YarnException { if (startedContainers.putIfAbsent(startedContainer.containerId,