diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 458a7a1c5c1..d1dd4921aef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -885,6 +885,11 @@ public void onUpdateContainerResourceError(ContainerId containerId, public void onStopContainerError(ContainerId containerId, Throwable t) { } + + @Override + public void onResyncContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to resync " + containerId, t); + } } public ServiceMetrics getServiceMetrics() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index cbc489c4e69..d34d0726ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -464,6 +464,7 @@ public void transition(Component component, ComponentEvent event) { component.getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId(), component.pendingInstances.size()); + component.getScheduler().getNmClient().resyncContainerAsync(container); component.dispatcher.getEventHandler().handle( new ComponentInstanceEvent(container.getId(), START)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index a8b64cc6dd2..64865d83f64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -281,4 +281,16 @@ public void localize(ContainerId containerId, NodeId nodeId, IOException { return null; } + + /** + *
Resync with an started container.
+ * + * @param containerId the Id of the container to resync. + * @param nodeId the Id of theNodeManager
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
+ */
+ public abstract void resyncContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
index 62e2d993e4d..b837b1dc8db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
@@ -177,6 +177,8 @@ protected NMClientAsync(String name, NMClient client,
public abstract void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext);
+ public abstract void resyncContainerAsync(Container container);
+
@Deprecated
public abstract void increaseContainerResourceAsync(Container container);
@@ -433,6 +435,14 @@ public void onRollbackLastReInitializationError(ContainerId containerId,
*/
public void onCommitLastReInitializationError(ContainerId containerId,
Throwable t) {}
+
+ /**
+ * Error Callback for container resync.
+ *
+ * @param containerId the Id of the container to resync
+ * @param t a Throwable.
+ */
+ public void onResyncContainerError(ContainerId containerId, Throwable t) {}
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
index 9cd653bed4b..d85c648e017 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
@@ -260,6 +260,27 @@ public void startContainerAsync(
}
}
+ public void resyncContainerAsync(Container container) {
+ if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+ LOG.error("Callback handler does not implement container resync "
+ + "callback methods");
+ return;
+ }
+ AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+ if (containers.putIfAbsent(container.getId(),
+ new StatefulContainer(this, container.getId())) != null) {
+ LOG.info("Container " + container.getId() + " is already started");
+ return;
+ }
+ try {
+ events.put(new ResyncContainerEvent(container));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of resyncing Container " +
+ container.getId());
+ handler.onResyncContainerError(container.getId(), e);
+ }
+ }
+
@Deprecated
public void increaseContainerResourceAsync(Container container) {
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
@@ -457,7 +478,8 @@ protected ContainerEventProcessor getContainerEventProcessor(
REINITIALIZE_CONTAINER,
RESTART_CONTAINER,
ROLLBACK_LAST_REINIT,
- COMMIT_LAST_REINT
+ COMMIT_LAST_REINT,
+ RESYNC_CONTAINER
}
protected static class ContainerEvent
@@ -508,6 +530,20 @@ public ContainerLaunchContext getContainerLaunchContext() {
}
}
+ protected static class ResyncContainerEvent extends ContainerEvent {
+ private Container container;
+
+ public ResyncContainerEvent(Container container) {
+ super(container.getId(), container.getNodeId(),
+ container.getContainerToken(), ContainerEventType.RESYNC_CONTAINER);
+ this.container = container;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+ }
+
protected static class ReInitializeContainerEvevnt extends ContainerEvent {
private ContainerLaunchContext containerLaunchContext;
private boolean autoCommit;
@@ -565,6 +601,10 @@ public Container getContainer() {
new StartContainerTransition())
.addTransition(ContainerState.PREP, ContainerState.DONE,
ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
+ .addTransition(ContainerState.PREP,
+ EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+ ContainerEventType.RESYNC_CONTAINER,
+ new ResyncContainerTransition())
// Transitions from RUNNING state
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
@@ -660,6 +700,30 @@ private ContainerState onExceptionRaised(StatefulContainer container,
}
}
+ protected static class ResyncContainerTransition implements
+ MultipleArcTransition