diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
index 6c3a4d6..cc0dd2b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
@@ -32,6 +32,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -56,6 +58,8 @@
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -73,6 +77,7 @@
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
@@ -460,5 +465,21 @@ public IncreaseContainersResourceResponse increaseContainersResource(
"Dummy function cause"));
throw new IOException(e);
}
+
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ Exception e = new Exception("Dummy function", new Exception(
+ "Dummy function cause"));
+ throw new IOException(e);
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ Exception e = new Exception("Dummy function", new Exception(
+ "Dummy function cause"));
+ throw new IOException(e);
+ }
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
index 610448c..2183d38 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
@@ -48,6 +48,10 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -463,6 +467,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(
}
@Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
public void close() throws IOException {
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
index 43e1d4c..1362920 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
@@ -25,6 +25,11 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -194,4 +199,87 @@ GetContainerStatusesResponse getContainerStatuses(
IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException;
+
+ /**
+ *
+ * The ApplicationMaster provides a list of
+ * {@link InitializeContainerRequest}s to a NodeManager to
+ * initialize {@link Container}s allocated to it using this
+ * interface.
+ *
+ *
+ *
+ * The ApplicationMaster has to provide details such as allocated
+ * resource capability, security tokens (if enabled), command to be executed
+ * to initialize the container, environment for the process, necessary
+ * binaries/jar/shared-objects etc. via the {@link ContainerLaunchContext} in
+ * the {@link InitializeContainerRequest}.
+ *
+ *
+ *
+ * The NodeManager sends a response via
+ * {@link InitializeContainersResponse} which includes a list of
+ * {@link Container}s of successfully initialized {@link Container}s, a
+ * containerId-to-exception map for each failed
+ * {@link InitializeContainerRequest} in which the exception indicates errors
+ * from per container and a allServicesMetaData map between the names of
+ * auxiliary services and their corresponding meta-data. Note:
+ * None-container-specific exceptions will still be thrown by the API method
+ * itself.
+ *
+ *
+ * The ApplicationMaster can use
+ * {@link #getContainerStatuses(GetContainerStatusesRequest)} to get updated
+ * statuses of the to-be-initialized or initialized containers.
+ *
+ *
+ * @param request request to initialize a list of containers
+ * @return response including conatinerIds of all successfully initialized
+ * containers, a containerId-to-exception map for failed requests and
+ * a allServicesMetaData map.
+ * @throws YarnException
+ * @throws IOException
+ * @throws NMNotYetReadyException This exception is thrown when NM starts from
+ * scratch but has not yet connected with RM.
+ */
+ @Public
+ @Unstable
+ InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException;
+
+ /**
+ *
+ * The ApplicationMaster requests a NodeManager to
+ * destroy a list of {@link Container}s allocated to it using this
+ * interface.
+ *
+ *
+ *
+ * The ApplicationMaster sends a {@link DestroyContainersRequest}
+ * which includes the {@link ContainerId}s of the containers to be destroyed.
+ *
+ *
+ *
+ * The NodeManager sends a response via
+ * {@link DestroyContainersResponse} which includes a list of
+ * {@link ContainerId} s of successfully destroyed containers, a
+ * containerId-to-exception map for each failed request in which the exception
+ * indicates errors from per container. Note: None-container-specific
+ * exceptions will still be thrown by the API method itself.
+ * ApplicationMaster can use
+ * {@link #getContainerStatuses(GetContainerStatusesRequest)} to get updated
+ * statuses of the containers.
+ *
+ *
+ * @param request request to destroy a list of containers
+ * @return response which includes a list of containerIds of successfully
+ * destroyed containers, a containerId-to-exception map for failed
+ * requests.
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ @Unstable
+ DestroyContainersResponse destroyContainers(DestroyContainersRequest request)
+ throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
index f88fa3b..9920e73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java
@@ -58,7 +58,7 @@
public static final int KILLED_EXCEEDED_PMEM = -104;
/**
- * Container was terminated by stop request by the app master.
+ * Container was terminated by stop/destroy request by the app master.
*/
public static final int KILLED_BY_APPMASTER = -105;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 582389f..65e8983 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -37,5 +37,8 @@
COMPLETE,
/** Queued at the NM. */
- QUEUED
+ QUEUED,
+
+ /** Waiting to be started */
+ AWAITING_START
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a4213ce..aa20f0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -854,6 +854,15 @@ public static boolean isAclEnabled(Configuration conf) {
NM_PREFIX + "container-diagnostics-maximum-size";
public static final int DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE = 10000;
+ /**
+ * Default time to wait before destroying a container after its execution is
+ * completed.
+ **/
+ public static final String NM_CONTAINER_DESTROY_DELAY_MS =
+ NM_PREFIX + "container-destroy-delay-ms";
+ public static final int DEFAULT_NM_CONTAINER_DESTROY_DELAY_MS =
+ 10 * 60 * 1000;
+
/** Interval at which the delayed token removal thread runs */
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
index f06f6cb..5bd41e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
@@ -35,4 +35,6 @@ service ContainerManagementProtocolService {
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
+ rpc initializeContainers(InitializeContainersRequestProto) returns (InitializeContainersResponseProto);
+ rpc destroyContainers(DestroyContainersRequestProto) returns (DestroyContainersResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 60cdfd1..e5c193d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -83,6 +83,7 @@ enum ContainerStateProto {
C_RUNNING = 2;
C_COMPLETE = 3;
C_QUEUED = 4;
+ C_AWAITING_START = 5;
}
message ContainerProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index bdf022f..51200ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -262,6 +262,11 @@ message StopContainerRequestProto {
message StopContainerResponseProto {
}
+message InitializeContainerRequestProto {
+ optional ContainerLaunchContextProto container_launch_context = 1;
+ optional hadoop.common.TokenProto container_token = 2;
+ optional int64 destroyDelay = 3;
+}
//// bulk API records
message StartContainersRequestProto {
@@ -306,6 +311,25 @@ message IncreaseContainersResourceResponseProto {
repeated ContainerExceptionMapProto failed_requests = 2;
}
+message InitializeContainersRequestProto {
+ repeated InitializeContainerRequestProto initialize_container_request = 1;
+}
+
+message InitializeContainersResponseProto {
+ repeated StringBytesMapProto services_meta_data = 1;
+ repeated ContainerIdProto succeeded_requests = 2;
+ repeated ContainerExceptionMapProto failed_requests = 3;
+}
+
+message DestroyContainersRequestProto {
+ repeated ContainerIdProto container_id = 1;
+}
+
+message DestroyContainersResponseProto {
+ repeated ContainerIdProto succeeded_requests = 1;
+ repeated ContainerExceptionMapProto failed_requests = 2;
+}
+
//////////////////////////////////////////////////////
/////// Application_History_Protocol /////////////////
//////////////////////////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
index 47270f5..53e76e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
@@ -165,4 +165,36 @@ public NMTokenCache getNMTokenCache() {
return nmTokenCache;
}
+ /**
+ * Initialize an allocated container.
+ *
+ * The ApplicationMaster or other applications that use the
+ * client must provide the details of the allocated container, including the
+ * Id, the assigned node's Id and the token via {@link Container}. In
+ * addition, the AM needs to provide the {@link ContainerLaunchContext} as
+ * well.
+ *
+ * @param container the allocated container
+ * @param containerLaunchContext the context information needed by the
+ * NodeManager to initialize the
+ * container
+ * @return a map between the auxiliary service names and their outputs
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract Map initializeContainer(Container container,
+ ContainerLaunchContext containerLaunchContext)
+ throws YarnException, IOException;
+
+ /**
+ * Destroy a started container.
+ *
+ * @param containerId the Id of the container to destroy
+ * @param nodeId the Id of the NodeManager
+ *
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract void destroyContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
index dc92cda..10820ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
@@ -33,10 +33,15 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -115,7 +120,7 @@ protected void serviceStop() throws Exception {
protected synchronized void cleanupRunningContainers() {
for (StartedContainer startedContainer : startedContainers.values()) {
try {
- stopContainer(startedContainer.getContainerId(),
+ destroyContainer(startedContainer.getContainerId(),
startedContainer.getNodeId());
} catch (YarnException e) {
LOG.error("Failed to stop Container " +
@@ -147,12 +152,14 @@ public void cleanupRunningContainersOnStop(boolean enabled) {
private ContainerId containerId;
private NodeId nodeId;
private ContainerState state;
+ private boolean multiStart;
-
- public StartedContainer(ContainerId containerId, NodeId nodeId) {
+ public StartedContainer(ContainerId containerId, NodeId nodeId,
+ boolean multiStart) {
this.containerId = containerId;
this.nodeId = nodeId;
state = ContainerState.NEW;
+ this.multiStart = multiStart;
}
public ContainerId getContainerId() {
@@ -162,15 +169,26 @@ public ContainerId getContainerId() {
public NodeId getNodeId() {
return nodeId;
}
+
+ public boolean allowsRestart() {
+ return multiStart;
+ }
}
- private void addStartingContainer(StartedContainer startedContainer)
+ private StartedContainer addStartingContainer(StartedContainer startedContainer)
throws YarnException {
- if (startedContainers.putIfAbsent(startedContainer.containerId,
- startedContainer) != null) {
- throw RPCUtil.getRemoteException("Container "
- + startedContainer.containerId.toString() + " is already started");
+ StartedContainer previousStartedContainer = startedContainers
+ .putIfAbsent(startedContainer.containerId, startedContainer);
+
+ if (previousStartedContainer != null) {
+ if(!previousStartedContainer.allowsRestart()) {
+ throw RPCUtil.getRemoteException("Container "
+ + startedContainer.containerId.toString() + " is already started");
+ }
+ return previousStartedContainer;
}
+
+ return startedContainer;
}
@Override
@@ -181,49 +199,58 @@ private void addStartingContainer(StartedContainer startedContainer)
// between startContainer and stopContainer only when startContainer is
// in progress for a given container.
StartedContainer startingContainer =
- new StartedContainer(container.getId(), container.getNodeId());
+ new StartedContainer(container.getId(), container.getNodeId(), false);
synchronized (startingContainer) {
- addStartingContainer(startingContainer);
-
- Map allServiceResponse;
- ContainerManagementProtocolProxyData proxy = null;
- try {
- proxy =
- cmProxy.getProxy(container.getNodeId().toString(),
- container.getId());
- StartContainerRequest scRequest =
- StartContainerRequest.newInstance(containerLaunchContext,
- container.getContainerToken());
- List list = new ArrayList();
- list.add(scRequest);
- StartContainersRequest allRequests =
- StartContainersRequest.newInstance(list);
- StartContainersResponse response =
- proxy
- .getContainerManagementProtocol().startContainers(allRequests);
- if (response.getFailedRequests() != null
- && response.getFailedRequests().containsKey(container.getId())) {
- Throwable t =
- response.getFailedRequests().get(container.getId()).deSerialize();
- parseAndThrowException(t);
- }
- allServiceResponse = response.getAllServicesMetaData();
- startingContainer.state = ContainerState.RUNNING;
- } catch (YarnException | IOException e) {
- startingContainer.state = ContainerState.COMPLETE;
- // Remove the started container if it failed to start
- startedContainers.remove(startingContainer.containerId);
- throw e;
- } catch (Throwable t) {
- startingContainer.state = ContainerState.COMPLETE;
- startedContainers.remove(startingContainer.containerId);
- throw RPCUtil.getRemoteException(t);
- } finally {
- if (proxy != null) {
- cmProxy.mayBeCloseProxy(proxy);
+ startingContainer = addStartingContainer(startingContainer);
+ // Do synchronization on the actual StartedContainer stored in
+ // startedContainers map to prevent race condition between startContainer
+ // and initializeContainer when multiple container start/init are allowed
+ synchronized (startingContainer) {
+
+ Map allServiceResponse;
+ ContainerManagementProtocolProxyData proxy = null;
+ try {
+ proxy =
+ cmProxy.getProxy(container.getNodeId().toString(),
+ container.getId());
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ container.getContainerToken());
+ List list = new ArrayList();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ StartContainersResponse response =
+ proxy
+ .getContainerManagementProtocol().startContainers(allRequests);
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(container.getId())) {
+ Throwable t =
+ response.getFailedRequests().get(container.getId()).deSerialize();
+ parseAndThrowException(t);
+ }
+ allServiceResponse = response.getAllServicesMetaData();
+
+ startingContainer.state = ContainerState.RUNNING;
+ } catch (YarnException | IOException e) {
+ if(!startingContainer.allowsRestart()) {
+ startingContainer.state = ContainerState.COMPLETE;
+ // Remove the started container if it failed to start
+ startedContainers.remove(startingContainer.containerId);
+ }
+ // TODO better handling of exceptions when multiStart is enabled
+ throw e;
+ } catch (Throwable t) {
+ startingContainer.state = ContainerState.COMPLETE;
+ startedContainers.remove(startingContainer.containerId);
+ throw RPCUtil.getRemoteException(t);
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
}
+ return allServiceResponse;
}
- return allServiceResponse;
}
}
@@ -269,9 +296,12 @@ public void stopContainer(ContainerId containerId, NodeId nodeId)
return;
}
stopContainerInternal(containerId, nodeId);
- // Only after successful
- startedContainer.state = ContainerState.COMPLETE;
- startedContainers.remove(startedContainer.containerId);
+ if(startedContainer.allowsRestart()) {
+ startedContainer.state = ContainerState.AWAITING_START;
+ } else {
+ startedContainer.state = ContainerState.COMPLETE;
+ startedContainers.remove(startedContainer.containerId);
+ }
}
} else {
stopContainerInternal(containerId, nodeId);
@@ -306,6 +336,89 @@ public ContainerStatus getContainerStatus(ContainerId containerId,
}
}
+ @Override
+ public Map initializeContainer(Container container,
+ ContainerLaunchContext containerLaunchContext)
+ throws YarnException, IOException {
+
+ StartedContainer startingContainer =
+ new StartedContainer(container.getId(), container.getNodeId(), true);
+ synchronized (startingContainer) {
+ startingContainer = addStartingContainer(startingContainer);
+ // Do synchronization on the actual StartedContainer stored in
+ // startedContainers map to prevent race condition between startContainer
+ // and initializeContainer
+ synchronized (startingContainer) {
+
+ if (!startingContainer.allowsRestart()) {
+ throw RPCUtil.getRemoteException(
+ "Container " + startingContainer.containerId.toString()
+ + " already initialized and does not allow reinitialization");
+ }
+
+ Map allServiceResponse;
+ ContainerManagementProtocolProxyData proxy = null;
+ try {
+ proxy = cmProxy.getProxy(container.getNodeId().toString(),
+ container.getId());
+ InitializeContainerRequest icRequest =
+ InitializeContainerRequest.newInstance(containerLaunchContext,
+ container.getContainerToken());
+ List list =
+ new ArrayList();
+ list.add(icRequest);
+ InitializeContainersRequest allRequests =
+ InitializeContainersRequest.newInstance(list);
+ InitializeContainersResponse response =
+ proxy.getContainerManagementProtocol()
+ .initializeContainers(allRequests);
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(container.getId())) {
+ Throwable t = response.getFailedRequests().get(container.getId())
+ .deSerialize();
+ parseAndThrowException(t);
+ }
+ allServiceResponse = response.getAllServicesMetaData();
+
+ startingContainer.state = ContainerState.AWAITING_START;
+ } catch (YarnException | IOException e) {
+ startingContainer.state = ContainerState.COMPLETE;
+ // Remove the started container if it failed to initialize
+ // TODO check if this behavior is correct
+ startedContainers.remove(startingContainer.containerId);
+ throw e;
+ } catch (Throwable t) {
+ startingContainer.state = ContainerState.COMPLETE;
+ startedContainers.remove(startingContainer.containerId);
+ throw RPCUtil.getRemoteException(t);
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ return allServiceResponse;
+ }
+ }
+ }
+
+ @Override
+ public void destroyContainer(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException {
+ StartedContainer startedContainer = startedContainers.get(containerId);
+
+ // Only allow one request of destroy container to move forward
+ // When entering the block
+ if (startedContainer != null) {
+ synchronized (startedContainer) {
+ destroyContainerInternal(containerId, nodeId);
+ startedContainer.state = ContainerState.COMPLETE;
+ startedContainers.remove(startedContainer.containerId);
+ }
+ } else {
+ destroyContainerInternal(containerId, nodeId);
+ }
+ }
+
private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
throws IOException, YarnException {
ContainerManagementProtocolProxyData proxy = null;
@@ -329,6 +442,29 @@ private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
}
}
+ private void destroyContainerInternal(ContainerId containerId, NodeId nodeId)
+ throws IOException, YarnException {
+ ContainerManagementProtocolProxyData proxy = null;
+ List containerIds = new ArrayList();
+ containerIds.add(containerId);
+ try {
+ proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+ DestroyContainersResponse response =
+ proxy.getContainerManagementProtocol().destroyContainers(
+ DestroyContainersRequest.newInstance(containerIds));
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(containerId)) {
+ Throwable t =
+ response.getFailedRequests().get(containerId).deSerialize();
+ parseAndThrowException(t);
+ }
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ }
+
public AtomicBoolean getCleanupRunningContainers() {
return cleanupRunningContainers;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
index ce18bde..e4975b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
@@ -32,6 +32,10 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -40,6 +44,10 @@
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@@ -49,10 +57,12 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.DestroyContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.InitializeContainersRequestProto;
import com.google.protobuf.ServiceException;
@@ -148,4 +158,33 @@ public IncreaseContainersResourceResponse increaseContainersResource(
return null;
}
}
+
+ @Override
+ public InitializeContainersResponse
+ initializeContainers(InitializeContainersRequest requests) throws YarnException,
+ IOException {
+ InitializeContainersRequestProto requestProto =
+ ((InitializeContainersRequestPBImpl) requests).getProto();
+ try {
+ return new InitializeContainersResponsePBImpl(proxy.initializeContainers(null,
+ requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(DestroyContainersRequest requests)
+ throws YarnException, IOException {
+ DestroyContainersRequestProto requestProto =
+ ((DestroyContainersRequestPBImpl) requests).getProto();
+ try {
+ return new DestroyContainersResponsePBImpl(proxy.destroyContainers(null,
+ requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
index 7626441..c5e6888 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
@@ -24,11 +24,17 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
@@ -38,6 +44,10 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.InitializeContainersRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.InitializeContainersResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.DestroyContainersRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.DestroyContainersResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
@@ -116,4 +126,32 @@ public IncreaseContainersResourceResponseProto increaseContainersResource(
throw new ServiceException(e);
}
}
+
+ @Override
+ public InitializeContainersResponseProto initializeContainers(RpcController arg0,
+ InitializeContainersRequestProto proto) throws ServiceException {
+ InitializeContainersRequestPBImpl request = new InitializeContainersRequestPBImpl(proto);
+ try {
+ InitializeContainersResponse response = real.initializeContainers(request);
+ return ((InitializeContainersResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public DestroyContainersResponseProto destroyContainers(RpcController arg0,
+ DestroyContainersRequestProto proto) throws ServiceException {
+ DestroyContainersRequestPBImpl request = new DestroyContainersRequestPBImpl(proto);
+ try {
+ DestroyContainersResponse response = real.destroyContainers(request);
+ return ((DestroyContainersResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index 0a19783..1660f17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -33,6 +33,10 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -174,5 +178,17 @@ public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
+
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
index 50ff1e0..4385275 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
@@ -25,10 +25,14 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@@ -158,5 +162,17 @@ public IncreaseContainersResourceResponse increaseContainersResource(
}
throw new YarnException("Shouldn't happen!!");
}
+
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index e718661..f589fe1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -35,6 +35,10 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@@ -227,6 +231,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
+
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java
index cb4021f..67249e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java
@@ -43,8 +43,10 @@
static final char PAIR_SEPARATOR = '\t';
// Some commonly used descriptions
+ public static final String INITIALIZE_CONTAINER = "Initialize Container Request";
public static final String START_CONTAINER = "Start Container Request";
public static final String STOP_CONTAINER = "Stop Container Request";
+ public static final String DESTROY_CONTAINER = "Destroy Container Request";
public static final String FINISH_SUCCESS_CONTAINER = "Container Finished - Succeeded";
public static final String FINISH_FAILED_CONTAINER = "Container Finished - Failed";
public static final String FINISH_KILLED_CONTAINER = "Container Finished - Killed";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index c0f02e9..0cb55ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -527,6 +527,11 @@ private ResourceUtilization getNodeUtilization() {
// subsequent call to stop container it will get removed from cache.
addCompletedContainer(containerId);
} else {
+
+ // notify containers in AWAITING_START as running to resource manager
+ if(containerStatus.getState() == ContainerState.AWAITING_START) {
+ containerStatus.setState(ContainerState.RUNNING);
+ }
containerStatuses.add(containerStatus);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 162823c..d8c60aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -56,10 +56,15 @@
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -120,7 +125,10 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerStartEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerStopEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
@@ -357,7 +365,12 @@ private void recoverContainer(RecoveredContainerState rcs)
credentials, metrics, token, context, rcs);
context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle(
- new ApplicationContainerInitEvent(container));
+ // For containers that do not allow multi start, the startAfterInit
+ // flag has no effect during recovering. However, we set startAfterInit
+ // to true in order the avoid the application master to restart
+ // or reinitialize the container until we check if the container is
+ // already running
+ new ApplicationContainerInitEvent(container, true));
} else {
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
LOG.warn(containerId + " has no corresponding application!");
@@ -704,7 +717,7 @@ protected void authorizeUser(UserGroupInformation remoteUgi,
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
- boolean startRequest)
+ boolean startRequest, boolean isRestart)
throws YarnException {
if (nmTokenIdentifier == null) {
throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
@@ -734,8 +747,9 @@ protected void authorizeStartAndResourceIncreaseRequest(
+ "container with container token")
.append(" issued for application attempt : ")
.append(containerId.getApplicationAttemptId());
- } else if (startRequest && !this.context.getContainerTokenSecretManager()
- .isValidStartContainerRequest(containerTokenIdentifier)) {
+ } else if (startRequest && !isRestart
+ && !this.context.getContainerTokenSecretManager()
+ .isValidStartContainerRequest(containerTokenIdentifier)) {
// Is the container being relaunched? Or RPC layer let startCall with
// tokens generated off old-secret through?
unauthorized = true;
@@ -803,16 +817,28 @@ public StartContainersResponse startContainers(
containerTokenIdentifier);
containerId = containerTokenIdentifier.getContainerID();
+ boolean isRestart = false;
+ Container containerFromContext = this.context.getContainers().get(containerId);
+ if(containerFromContext != null) {
+ isRestart = containerFromContext.allowsRestart();
+ }
+
// Initialize the AMRMProxy service instance only if the container is of
// type AM and if the AMRMProxy service is enabled
- if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
+ if (!isRestart && amrmProxyEnabled && containerTokenIdentifier.getContainerType()
.equals(ContainerType.APPLICATION_MASTER)) {
this.getAMRMProxyService().processApplicationStartRequest(request);
}
- performContainerPreStartChecks(nmTokenIdentifier, request,
- containerTokenIdentifier);
- startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
- request);
+ performContainerPreStartChecks(nmTokenIdentifier,
+ request.getContainerLaunchContext(), containerTokenIdentifier,
+ isRestart);
+ if(isRestart) {
+ restartContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
+ request, containerFromContext);
+ } else {
+ startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
+ request);
+ }
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -831,8 +857,8 @@ public StartContainersResponse startContainers(
}
private void performContainerPreStartChecks(
- NMTokenIdentifier nmTokenIdentifier, StartContainerRequest request,
- ContainerTokenIdentifier containerTokenIdentifier)
+ NMTokenIdentifier nmTokenIdentifier, ContainerLaunchContext launchContext,
+ ContainerTokenIdentifier containerTokenIdentifier, boolean isRestart)
throws YarnException, InvalidToken {
/*
* 1) It should save the NMToken into NMTokenSecretManager. This is done
@@ -847,25 +873,55 @@ private void performContainerPreStartChecks(
* correct RMIdentifier. d) It is not expired.
*/
authorizeStartAndResourceIncreaseRequest(
- nmTokenIdentifier, containerTokenIdentifier, true);
+ nmTokenIdentifier, containerTokenIdentifier, true, isRestart);
// update NMToken
updateNMTokenIdentifier(nmTokenIdentifier);
- ContainerLaunchContext launchContext = request.getContainerLaunchContext();
-
- Map serviceData = getAuxServiceMetaData();
- if (launchContext.getServiceData()!=null &&
- !launchContext.getServiceData().isEmpty()) {
- for (Entry meta : launchContext.getServiceData()
- .entrySet()) {
- if (null == serviceData.get(meta.getKey())) {
- throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
- + " does not exist");
+ // Launch context could be null in case of a restart without reinitialization
+ if(launchContext != null || !isRestart) {
+ Map serviceData = getAuxServiceMetaData();
+ if (launchContext.getServiceData()!=null &&
+ !launchContext.getServiceData().isEmpty()) {
+ for (Entry meta : launchContext.getServiceData()
+ .entrySet()) {
+ if (null == serviceData.get(meta.getKey())) {
+ throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+ + " does not exist");
+ }
}
}
}
}
+ private long verifyAndGetDestroyDelay(
+ InitializeContainerRequest request,
+ ContainerTokenIdentifier containerTokenIdentifier)
+ throws YarnException {
+
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ String containerIdStr = containerId.toString();
+ String user = containerTokenIdentifier.getApplicationSubmitter();
+
+ ApplicationId applicationID =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ long destroyDelay = request.getDestroyDelay();
+
+ if (destroyDelay < 0) {
+ NMAuditLogger.logFailure(user, AuditConstants.INITIALIZE_CONTAINER,
+ "ContainerManagerImpl", "Invalid destroy delay value", applicationID,
+ containerId);
+ throw RPCUtil.getRemoteException(
+ "Container " + containerIdStr + " invalid destroy delay value");
+ } else if (destroyDelay == 0) {
+ destroyDelay = this.getConfig().getLong(
+ YarnConfiguration.NM_CONTAINER_DESTROY_DELAY_MS,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_DESTROY_DELAY_MS);
+ }
+
+ return destroyDelay;
+ }
+
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
Map appAcls,
@@ -961,7 +1017,7 @@ protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
this.context.getNMStateStore().storeContainer(containerId, request);
dispatcher.getEventHandler().handle(
- new ApplicationContainerInitEvent(container));
+ new ApplicationContainerInitEvent(container, true));
this.context.getContainerTokenSecretManager().startContainerSuccessful(
containerTokenIdentifier);
@@ -981,6 +1037,171 @@ protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
}
}
+ @SuppressWarnings("unchecked")
+ protected void reinitializeContainerInternal(
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ InitializeContainerRequest request, Container previousContainer)
+ throws YarnException, IOException {
+
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ String containerIdStr = containerId.toString();
+ String user = containerTokenIdentifier.getApplicationSubmitter();
+
+ ApplicationId applicationID =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ LOG.info("Initialize request for " + containerIdStr + " by user " + user);
+ LOG.info("Reinitializing container " + containerIdStr);
+
+ if(!previousContainer.allowsRestart()) {
+ NMAuditLogger.logFailure(user, AuditConstants.INITIALIZE_CONTAINER,
+ "ContainerManagerImpl", "Container does not allow reinitialization!",
+ applicationID, containerId);
+ throw RPCUtil.getRemoteException("Container " + containerIdStr
+ + " does not allow reinitialization!");
+ }
+ if(!previousContainer.isAwaitingStart()) {
+ NMAuditLogger.logFailure(user, AuditConstants.INITIALIZE_CONTAINER,
+ "ContainerManagerImpl", "Container is not awaiting for reinitialization",
+ applicationID, containerId);
+ throw RPCUtil.getRemoteException("Container " + containerIdStr
+ + " is not awaiting for reinitialization");
+ }
+
+ StartContainerRequest startRequest = StartContainerRequest.newInstance(
+ request.getContainerLaunchContext(), request.getContainerToken());
+
+ dispatcher.getEventHandler()
+ .handle(new ContainerInitEvent(containerId, false, startRequest));
+
+ NMAuditLogger.logSuccess(user, AuditConstants.INITIALIZE_CONTAINER,
+ "ContainerManageImpl", applicationID, containerId);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void initializeContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ InitializeContainerRequest request) throws YarnException, IOException {
+
+ ContainerId containerId = containerTokenIdentifier.getContainerID();
+ String containerIdStr = containerId.toString();
+ String user = containerTokenIdentifier.getApplicationSubmitter();
+
+ ContainerLaunchContext launchContext = request.getContainerLaunchContext();
+
+ Credentials credentials =
+ YarnServerSecurityUtils.parseCredentials(launchContext);
+
+ ApplicationId applicationID =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ LOG.info("Initialize request for " + containerIdStr + " by user " + user);
+ LOG.info("Initializing new container " + containerIdStr);
+
+ Container container = new ContainerImpl(getConfig(), this.dispatcher,
+ launchContext, credentials, metrics, containerTokenIdentifier, context,
+ verifyAndGetDestroyDelay(request, containerTokenIdentifier));
+
+ // we already verified that the container is not present, coordination for
+ // new container insertion is guaranteed by lock on this.context
+ context.getContainers().put(containerId, container);
+
+ this.readLock.lock();
+ try {
+ if (!isServiceStopped()) {
+ // Create the application
+ Application application = new ApplicationImpl(dispatcher, user,
+ applicationID, credentials, context);
+ if (null == context.getApplications().putIfAbsent(applicationID,
+ application)) {
+ LOG.info("Creating a new application reference for app "
+ + applicationID);
+ LogAggregationContext logAggregationContext =
+ containerTokenIdentifier.getLogAggregationContext();
+ Map appAcls =
+ container.getLaunchContext().getApplicationACLs();
+ context.getNMStateStore().storeApplication(applicationID,
+ buildAppProto(applicationID, user, credentials, appAcls,
+ logAggregationContext));
+ dispatcher.getEventHandler().handle(
+ new ApplicationInitEvent(applicationID, appAcls,
+ logAggregationContext));
+ }
+
+ StartContainerRequest startRequest = StartContainerRequest
+ .newInstance(request.getContainerLaunchContext(),
+ request.getContainerToken());
+
+ // TODO: make a single call to the state store
+ this.context.getNMStateStore().storeContainerDestroyDelay(containerId,
+ request.getDestroyDelay());
+ this.context.getNMStateStore().storeContainer(containerId, startRequest);
+ dispatcher.getEventHandler().handle(
+ new ApplicationContainerInitEvent(container, false));
+
+ if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
+ .equals(ContainerType.APPLICATION_MASTER)) {
+ this.getAMRMProxyService().processApplicationStartRequest(startRequest);
+ }
+
+ this.context.getContainerTokenSecretManager().startContainerSuccessful(
+ containerTokenIdentifier);
+ NMAuditLogger.logSuccess(user, AuditConstants.INITIALIZE_CONTAINER,
+ "ContainerManageImpl", applicationID, containerId);
+
+ } else {
+ throw new YarnException(
+ "Container initialization failed as the NodeManager is " +
+ "in the process of shutting down");
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void restartContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ StartContainerRequest request, Container container)
+ throws YarnException, IOException {
+
+ ContainerId containerId = container.getContainerId();
+ String containerIdStr = containerId.toString();
+ String user = containerTokenIdentifier.getApplicationSubmitter();
+
+ ApplicationId applicationID =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ LOG.info("Start request for " + containerIdStr + " by user " + user);
+
+ if(!container.isAwaitingStart()) {
+ NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER,
+ "ContainerManagerImpl", "Container is not awaiting for start",
+ applicationID, containerId);
+ throw RPCUtil.getRemoteException("Container " + containerIdStr
+ + " is not awaiting for start");
+ }
+
+ if(request.getContainerLaunchContext() == null) {
+ LOG.info("Restarting container " + containerIdStr);
+ dispatcher.getEventHandler().handle(new ContainerStartEvent(containerId));
+ } else {
+ LOG.info("Reinitializing and starting container " + containerIdStr);
+ dispatcher.getEventHandler()
+ .handle(new ContainerInitEvent(containerId, true, request));
+ }
+
+
+ NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
+ "ContainerManageImpl", applicationID, containerId);
+ // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+ // launch. A finished Application will not launch containers.
+ metrics.launchedContainer();
+ metrics.allocateContainer(containerTokenIdentifier.getResource());
+ }
+
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
org.apache.hadoop.yarn.api.records.Token token,
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,
@@ -1036,7 +1257,7 @@ public IncreaseContainersResourceResponse increaseContainersResource(
verifyAndGetContainerTokenIdentifier(token,
containerTokenIdentifier);
authorizeStartAndResourceIncreaseRequest(
- nmTokenIdentifier, containerTokenIdentifier, false);
+ nmTokenIdentifier, containerTokenIdentifier, false, false);
containerId = containerTokenIdentifier.getContainerID();
// Reuse the startContainer logic to update NMToken,
// as container resource increase request will have come with
@@ -1166,7 +1387,11 @@ public StopContainersResponse stopContainers(StopContainersRequest requests)
try {
Container container = this.context.getContainers().get(id);
authorizeGetAndStopContainerRequest(id, container, true, identifier);
- stopContainerInternal(id);
+ if(container.allowsRestart()) {
+ stopContainerInternal(id);
+ } else {
+ destroyContainerInternal(id);
+ }
succeededRequests.add(id);
} catch (YarnException e) {
failedRequests.put(id, SerializedException.newInstance(e));
@@ -1189,6 +1414,62 @@ protected void stopContainerInternal(ContainerId containerID)
+ " is not handled by this NodeManager");
}
} else {
+
+ // TODO: check container state before sending stop to avoid invalid
+ // event exceptions in state machine
+
+ dispatcher.getEventHandler()
+ .handle(new ContainerStopEvent(containerID));
+
+ NMAuditLogger.logSuccess(container.getUser(),
+ AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
+ .getApplicationAttemptId().getApplicationId(), containerID);
+ }
+ }
+
+ /**
+ * Destroy a list of containers running on this NodeManager.
+ */
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest requests) throws YarnException, IOException {
+
+ List succeededRequests = new ArrayList();
+ Map failedRequests =
+ new HashMap();
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+ if (identifier == null) {
+ throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
+ }
+ for (ContainerId id : requests.getContainerIds()) {
+ try {
+ Container container = this.context.getContainers().get(id);
+ authorizeGetAndStopContainerRequest(id, container, true, identifier);
+ destroyContainerInternal(id);
+ succeededRequests.add(id);
+ } catch (YarnException e) {
+ failedRequests.put(id, SerializedException.newInstance(e));
+ }
+ }
+ return DestroyContainersResponse
+ .newInstance(succeededRequests, failedRequests);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void destroyContainerInternal(ContainerId containerID)
+ throws YarnException, IOException {
+ String containerIDStr = containerID.toString();
+ Container container = this.context.getContainers().get(containerID);
+ LOG.info("Destroying container with container Id: " + containerIDStr);
+
+ if (container == null) {
+ if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " is not handled by this NodeManager");
+ }
+ } else {
+
context.getNMStateStore().storeContainerKilled(containerID);
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
@@ -1196,7 +1477,7 @@ protected void stopContainerInternal(ContainerId containerID)
"Container killed by the ApplicationMaster."));
NMAuditLogger.logSuccess(container.getUser(),
- AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
+ AuditConstants.DESTROY_CONTAINER, "ContainerManageImpl", containerID
.getApplicationAttemptId().getApplicationId(), containerID);
}
}
@@ -1286,6 +1567,75 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
}
}
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest requests) throws YarnException, IOException {
+ if (blockNewContainerRequests.get()) {
+ throw new NMNotYetReadyException(
+ "Rejecting new containers as NodeManager has not"
+ + " yet connected with ResourceManager");
+ }
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+ authorizeUser(remoteUgi, nmTokenIdentifier);
+ List succeededContainers = new ArrayList();
+ Map failedContainers =
+ new HashMap();
+ // Synchronize with NodeStatusUpdaterImpl#registerWithRM
+ // to avoid race condition during NM-RM resync (due to RM restart) while a
+ // container is being started, in particular when the container has not yet
+ // been added to the containers map in NMContext.
+ synchronized (this.context) {
+ for (InitializeContainerRequest request : requests
+ .getInitializeContainerRequests()) {
+ ContainerId containerId = null;
+ try {
+ if (request.getContainerToken() == null
+ || request.getContainerToken().getIdentifier() == null) {
+ throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+ }
+
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(request.getContainerToken());
+ verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
+ containerTokenIdentifier);
+ containerId = containerTokenIdentifier.getContainerID();
+
+ boolean isReinit = false;
+ Container previousContainer = this.context.getContainers().get(containerId);
+ if(previousContainer != null) {
+ isReinit = true;
+ }
+
+ performContainerPreStartChecks(nmTokenIdentifier,
+ request.getContainerLaunchContext(), containerTokenIdentifier,
+ isReinit);
+
+ if(isReinit) {
+ reinitializeContainerInternal(nmTokenIdentifier,
+ containerTokenIdentifier, request, previousContainer);
+ } else {
+ initializeContainerInternal(nmTokenIdentifier,
+ containerTokenIdentifier, request);
+ }
+
+ succeededContainers.add(containerId);
+ } catch (YarnException e) {
+ failedContainers.put(containerId, SerializedException.newInstance(e));
+ } catch (InvalidToken ie) {
+ failedContainers
+ .put(containerId, SerializedException.newInstance(ie));
+ throw ie;
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
+ }
+ return InitializeContainersResponse
+ .newInstance(getAuxServiceMetaData(), succeededContainers,
+ failedContainers);
+ }
+ }
+
class ContainerEventDispatcher implements EventHandler {
@Override
public void handle(ContainerEvent event) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java
index ef9e5a4..b874c90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
@@ -34,14 +35,36 @@
*/
public class ApplicationContainerInitEvent extends ApplicationEvent {
final Container container;
+ final boolean startAfterInit;
+ private final ContainerLaunchContext updatedCtxt;
- public ApplicationContainerInitEvent(Container container) {
+ public ApplicationContainerInitEvent(Container container,
+ boolean startAfterInit, ContainerLaunchContext updatedCtxt) {
super(container.getContainerId().getApplicationAttemptId()
.getApplicationId(), ApplicationEventType.INIT_CONTAINER);
this.container = container;
+ this.startAfterInit = startAfterInit;
+ this.updatedCtxt = updatedCtxt;
+ }
+
+ public ApplicationContainerInitEvent(Container container,
+ boolean startAfterInit) {
+ super(container.getContainerId().getApplicationAttemptId()
+ .getApplicationId(), ApplicationEventType.INIT_CONTAINER);
+ this.container = container;
+ this.startAfterInit = startAfterInit;
+ this.updatedCtxt = null;
}
Container getContainer() {
return container;
}
+
+ public boolean shouldStartAfterInit() {
+ return this.startAfterInit;
+ }
+
+ public ContainerLaunchContext getUpdatedContainerLaunchContext() {
+ return this.updatedCtxt;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
index b4ba76a..04f30b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
@@ -23,6 +23,7 @@
// Source: ContainerManager
INIT_APPLICATION,
INIT_CONTAINER,
+ START_CONTAINER,
FINISH_APPLICATION, // Source: LogAggregationService if init fails
// Source: ResourceLocalizationService
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index fbc8453..4c5aa19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -21,7 +21,9 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -41,6 +43,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerStartEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -78,6 +81,9 @@
Map containers =
new HashMap();
+
+ Queue pendingContainerInitEvents =
+ new LinkedList();
public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
Credentials credentials, Context context) {
@@ -167,6 +173,10 @@ public ApplicationState getApplicationState() {
new InitContainerTransition())
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
+ ApplicationEventType.START_CONTAINER,
+ new StartContainerTransition())
+ .addTransition(ApplicationState.RUNNING,
+ ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
CONTAINER_DONE_TRANSITION)
.addTransition(
@@ -302,15 +312,18 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.containers.put(container.getContainerId(), container);
LOG.info("Adding " + container.getContainerId()
+ " to application " + app.toString());
-
+
+ ContainerInitEvent containerInitEvent = new ContainerInitEvent(
+ container.getContainerId(), initEvent.shouldStartAfterInit());
+
switch (app.getApplicationState()) {
case RUNNING:
- app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
- container.getContainerId()));
+ app.dispatcher.getEventHandler().handle(containerInitEvent);
break;
case INITING:
case NEW:
// these get queued up and sent out in AppInitDoneTransition
+ app.pendingContainerInitEvents.add(containerInitEvent);
break;
default:
assert false : "Invalid state for InitContainerTransition: " +
@@ -325,9 +338,9 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
// Start all the containers waiting for ApplicationInit
- for (Container container : app.containers.values()) {
- app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
- container.getContainerId()));
+ while (!app.pendingContainerInitEvents.isEmpty()) {
+ app.dispatcher.getEventHandler()
+ .handle(app.pendingContainerInitEvents.remove());
}
}
}
@@ -442,6 +455,20 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
}
}
+ @SuppressWarnings("unchecked")
+ static class StartContainerTransition implements
+ SingleArcTransition {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+ ApplicationContainerStartEvent startEvent =
+ (ApplicationContainerStartEvent) event;
+ Container container = startEvent.getContainer();
+
+ app.dispatcher.getEventHandler().handle(new ContainerStartEvent(
+ container.getContainerId()));
+ }
+ }
+
@Override
public void handle(ApplicationEvent event) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 7571964..c3ea338 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -67,6 +67,16 @@
void setLogDir(String logDir);
+ boolean shouldStartAfterInit();
+
+ void setStartAfterInit(boolean startAfterInit);
+
+ long getDestroyDelay();
+
+ boolean allowsRestart();
+
+ boolean isAwaitingStart();
+
String toString();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 5622f8c..815e4df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -22,6 +22,8 @@
// Producer: ContainerManager
INIT_CONTAINER,
+ START_CONTAINER,
+ STOP_CONTAINER,
KILL_CONTAINER,
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index b1ddc2e..4c2677f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -38,6 +39,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -58,11 +60,13 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.CleanupContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
@@ -113,6 +117,9 @@
private int remainingRetryAttempts;
private String workDir;
private String logDir;
+ // the time to wait before destroying the container after completion
+ private long destroyDelay;
+ private AtomicBoolean startAfterInit = new AtomicBoolean(false);
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
@@ -143,7 +150,8 @@
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
- ContainerTokenIdentifier containerTokenIdentifier, Context context) {
+ ContainerTokenIdentifier containerTokenIdentifier, Context context,
+ long destroyDelay) {
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.stateStore = context.getNMStateStore();
@@ -158,6 +166,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.diagnosticsMaxSize = conf.getInt(
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
+ this.destroyDelay = destroyDelay;
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
this.resource = containerTokenIdentifier.getResource();
@@ -188,6 +197,14 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
stateMachine = stateMachineFactory.make(this);
}
+ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ ContainerLaunchContext launchContext, Credentials creds,
+ NodeManagerMetrics metrics,
+ ContainerTokenIdentifier containerTokenIdentifier, Context context) {
+ this(conf, dispatcher, launchContext, creds, metrics,
+ containerTokenIdentifier, context, 0L);
+ }
+
// constructor for a recovered container
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@@ -210,6 +227,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
+ this.destroyDelay = rcs.getDestroyDelay();
}
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
@@ -274,20 +292,37 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
// From LOCALIZED State
.addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
- .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
- ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
- new ExitedWithFailureTransition(true))
.addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.LOCALIZED, ContainerState.LAUNCHING,
+ ContainerEventType.START_CONTAINER,
+ new StartContainerTransition())
+ .addTransition(ContainerState.LOCALIZED,
+ EnumSet.of(ContainerState.LOCALIZING,
+ ContainerState.LOCALIZED), ContainerEventType.INIT_CONTAINER,
+ new RequestResourcesTransition())
+ // From LAUNCHING State
+ .addTransition(ContainerState.LAUNCHING, ContainerState.LAUNCHING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.LAUNCHING, ContainerState.RUNNING,
+ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
+ .addTransition(ContainerState.LAUNCHING, ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.LAUNCHING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+
// From RUNNING State
.addTransition(ContainerState.RUNNING,
- ContainerState.EXITED_WITH_SUCCESS,
+ EnumSet.of(ContainerState.EXITED_WITH_SUCCESS,
+ ContainerState.LOCALIZED),
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
- new ExitedWithSuccessTransition(true))
+ new ExitedWithSuccessTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RELAUNCHING,
ContainerState.EXITED_WITH_FAILURE),
@@ -301,7 +336,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
.addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
-
+ .addTransition(ContainerState.RUNNING, ContainerState.STOPPING,
+ ContainerEventType.STOP_CONTAINER, new StopTransition())
+
// From RELAUNCHING State
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
@@ -315,6 +352,20 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
+ // From STOPPING State
+ .addTransition(ContainerState.STOPPING, ContainerState.STOPPING,
+ ContainerEventType.STOP_CONTAINER, new StopTransition())
+ .addTransition(ContainerState.STOPPING, ContainerState.STOPPING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.STOPPING, ContainerState.LOCALIZED,
+ EnumSet.of(ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE),
+ new ContainerStoppedTransition())
+ .addTransition(ContainerState.STOPPING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+
// From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
@@ -358,7 +409,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerEventType.KILL_CONTAINER)
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
- new ExitedWithSuccessTransition(false))
+ new ExitedWithSuccessOnKillingTransition())
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(false))
@@ -412,13 +463,18 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
stateMachine;
public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
+ if(this.isAwaitingStart()) {
+ return org.apache.hadoop.yarn.api.records.ContainerState.AWAITING_START;
+ }
switch (stateMachine.getCurrentState()) {
case NEW:
case LOCALIZING:
case LOCALIZATION_FAILED:
case LOCALIZED:
+ case LAUNCHING:
case RUNNING:
case RELAUNCHING:
+ case STOPPING:
case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE:
case KILLING:
@@ -446,6 +502,7 @@ public String getUser() {
this.readLock.lock();
try {
if (ContainerState.LOCALIZED == getContainerState()
+ || ContainerState.LAUNCHING == getContainerState()
|| ContainerState.RELAUNCHING == getContainerState()) {
return localizedResources;
} else {
@@ -559,6 +616,37 @@ public void setLogDir(String logDir) {
this.logDir = logDir;
}
+ @Override
+ public boolean shouldStartAfterInit() {
+ return this.startAfterInit.get();
+ }
+
+ @Override
+ public void setStartAfterInit(boolean startAfterInit) {
+ this.startAfterInit.set(startAfterInit);
+ }
+
+ @Override
+ public long getDestroyDelay() {
+ return destroyDelay;
+ }
+
+ @Override
+ public boolean allowsRestart() {
+ return destroyDelay > 0;
+ }
+
+ @Override
+ public boolean isAwaitingStart() {
+ this.readLock.lock();
+ try {
+ return this.getContainerState() == ContainerState.LOCALIZED
+ && !this.startAfterInit.get();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
@SuppressWarnings("unchecked")
private void sendFinishedEvents() {
// Inform the application
@@ -579,6 +667,11 @@ private void sendLaunchEvent() {
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
// try to recover a container that was previously launched
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+ // for container that allows restart we should
+ // avoid trying to recover again
+ if(allowsRestart()) {
+ recoveredStatus = RecoveredContainerStatus.REQUESTED;
+ }
}
containerLaunchStartTime = clock.getTime();
dispatcher.getEventHandler().handle(
@@ -586,6 +679,12 @@ private void sendLaunchEvent() {
}
@SuppressWarnings("unchecked") // dispatcher not typed
+ private void sendStartEvent() {
+ dispatcher.getEventHandler()
+ .handle(new ContainerStartEvent(this.getContainerId()));
+ }
+
+ @SuppressWarnings("unchecked") // dispatcher not typed
private void sendRelaunchEvent() {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.RELAUNCH_CONTAINER;
@@ -647,6 +746,39 @@ public void cleanup() {
new ContainerLocalizationCleanupEvent(this, rsrc));
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void releaseLocalResources() {
+ Map> rsrc =
+ new HashMap>();
+ if (!publicRsrcs.isEmpty()) {
+ Collection releasePublicRsrcs =
+ new ArrayList();
+ releasePublicRsrcs.addAll(publicRsrcs);
+ publicRsrcs.clear();
+ rsrc.put(LocalResourceVisibility.PUBLIC, releasePublicRsrcs);
+ }
+ if (!privateRsrcs.isEmpty()) {
+ Collection releasePrivateRsrcs =
+ new ArrayList();
+ releasePrivateRsrcs.addAll(privateRsrcs);
+ privateRsrcs.clear();
+ rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
+ }
+ if (!appRsrcs.isEmpty()) {
+ Collection releaseAppRsrcs =
+ new ArrayList();
+ releaseAppRsrcs.addAll(appRsrcs);
+ appRsrcs.clear();
+ rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
+ }
+ dispatcher.getEventHandler().handle(
+ new ContainerLocalizationReleaseEvent(this, rsrc));
+ localizedResources.clear();
+ // TODO: should we also clear resourcesToBeUploaded and
+ // resourcesUploadPolicies?
+ }
+
static class ContainerTransition implements
SingleArcTransition {
@@ -695,7 +827,43 @@ public ContainerState transition(ContainerImpl container,
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
+
+ ContainerInitEvent initEvent = (ContainerInitEvent) event;
+ container.setStartAfterInit(initEvent.shouldStartAfterInit());
+ final StartContainerRequest updatedStartRequest =
+ initEvent.getUpdatedStartRequest();
+
+ if(updatedStartRequest != null) {
+ // application master has requested a container reinitialization
+ // we must free current local resources and update the container launch
+ // context
+
+ final ContainerLaunchContext updatedCtxt = updatedStartRequest.getContainerLaunchContext();
+
+ StartContainerRequest storestartRequest = StartContainerRequest
+ .newInstance(ctxt, updatedStartRequest.getContainerToken());
+ try {
+ container.context.getNMStateStore()
+ .storeContainer(container.getContainerId(), storestartRequest);
+ } catch (IOException e) {
+ // TODO: is it ok to just report a warning, or should we do a no-op
+ // instead of the reinitialization?
+ LOG.warn(
+ "Unable to update container launch context in state store for "
+ + container.getContainerId(), e);
+ }
+
+ ctxt.setCommands(updatedCtxt.getCommands());
+ ctxt.setLocalResources(updatedCtxt.getLocalResources());
+ ctxt.setEnvironment(updatedCtxt.getEnvironment());
+ ctxt.setServiceData(updatedCtxt.getServiceData());
+ // TODO: do we want to update other container launch context fields
+ // after reinitialization?
+
+ container.releaseLocalResources();
+ }
+
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_INIT, container));
@@ -771,7 +939,9 @@ public ContainerState transition(ContainerImpl container,
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
- container.sendLaunchEvent();
+ if(initEvent.shouldStartAfterInit()) {
+ container.sendStartEvent();
+ }
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@@ -835,7 +1005,9 @@ public ContainerState transition(ContainerImpl container,
new ContainerLocalizationEvent(LocalizationEventType.
CONTAINER_RESOURCES_LOCALIZED, container));
- container.sendLaunchEvent();
+ if(container.startAfterInit.get()) {
+ container.sendStartEvent();
+ }
container.metrics.endInitingContainer();
// If this is a recovered container that has already launched, skip
@@ -859,8 +1031,9 @@ public ContainerState transition(ContainerImpl container,
}
/**
- * Transition from LOCALIZED state to RUNNING state upon receiving
- * a CONTAINER_LAUNCHED event
+ * Transitions upon receiving CONTAINER_LAUNCHED:
+ * - LOCALIZED -> LAUNCHING
+ * - LOCALIZED -> RUNNING (used for container recovery)
*/
static class LaunchTransition extends ContainerTransition {
@SuppressWarnings("unchecked")
@@ -869,44 +1042,53 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.sendContainerMonitorStartEvent();
container.metrics.runningContainer();
container.wasLaunched = true;
+ container.setStartAfterInit(false);
if (container.recoveredAsKilled) {
LOG.info("Killing " + container.containerId
+ " due to recovered as killed");
container.addDiagnostics("Container recovered as killed.\n");
container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.CLEANUP_CONTAINER));
+ new CleanupContainersLauncherEvent(container, true));
}
}
}
/**
- * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
- * upon EXITED_WITH_SUCCESS message.
+ * Transition from RUNNING state to EXITED_WITH_SUCCESS or
+ * LOCALIZED state upon EXITED_WITH_SUCCESS message.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
- static class ExitedWithSuccessTransition extends ContainerTransition {
+ static class ExitedWithSuccessTransition implements
+ MultipleArcTransition {
- boolean clCleanupRequired;
+ @Override
+ public ContainerState transition(ContainerImpl container, ContainerEvent event) {
+ // Set exit code to 0 on success
+ container.exitCode = 0;
- public ExitedWithSuccessTransition(boolean clCleanupRequired) {
- this.clCleanupRequired = clCleanupRequired;
+ if(container.allowsRestart()) {
+ return ContainerState.LOCALIZED;
+ }
+
+ container.dispatcher.getEventHandler().handle(
+ new CleanupContainersLauncherEvent(container, true));
+ container.cleanup();
+
+ return ContainerState.EXITED_WITH_SUCCESS;
}
+ }
+
+ /**
+ * Transition from KILLING state to EXITED_WITH_SUCCESS state
+ * upon EXITED_WITH_SUCCESS message.
+ */
+ static class ExitedWithSuccessOnKillingTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
- // Set exit code to 0 on success
+ // Set exit code to 0 on success
container.exitCode = 0;
-
- // TODO: Add containerWorkDir to the deletion service.
-
- if (clCleanupRequired) {
- container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.CLEANUP_CONTAINER));
- }
-
container.cleanup();
}
}
@@ -932,13 +1114,9 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
- // TODO: Add containerWorkDir to the deletion service.
- // TODO: Add containerOuputDir to the deletion service.
-
if (clCleanupRequired) {
container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.CLEANUP_CONTAINER));
+ new CleanupContainersLauncherEvent(container, true));
}
container.cleanup();
@@ -1013,6 +1191,20 @@ public void run() {
}
}
+ /**
+ * Transition from LOCALIZED to LAUNCHING
+ * upon receiving START_CONTAINER event.
+ */
+ static class StartContainerTransition implements
+ SingleArcTransition {
+
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // TODO: if allows restart, stop monitoring for container destroy
+ container.sendLaunchEvent();
+ }
+ }
+
@Override
public boolean isRetryContextSet() {
return containerRetryContext.getRetryPolicy()
@@ -1127,8 +1319,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
public void transition(ContainerImpl container, ContainerEvent event) {
// Kill the process/process-grp
container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.CLEANUP_CONTAINER));
+ new CleanupContainersLauncherEvent(container, true));
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.addDiagnostics(killEvent.getDiagnostic(), "\n");
container.exitCode = killEvent.getContainerExitStatus();
@@ -1136,6 +1327,22 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
/**
+ * Transitions from RUNNING to STOPPING upon receiving STOP_CONTAINER
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class StopTransition implements
+ SingleArcTransition {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ LOG.info("Stopping Container " + container.getContainerId() + ".");
+ container.dispatcher.getEventHandler().handle(
+ new CleanupContainersLauncherEvent(container, false));
+ container.addDiagnostics("Container is being stopped by application master.",
+ "\n");
+ }
+ }
+
+ /**
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
* upon receiving CONTAINER_KILLED_ON_REQUEST.
*/
@@ -1159,6 +1366,29 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
/**
+ * Transition from STOPPING to LOCALIZED upon receiving
+ * CONTAINER_KILLED_ON_REQUEST or CONTAINER_EXITED_WITH_SUCCESS.
+ */
+ static class ContainerStoppedTransition implements
+ SingleArcTransition {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerExitEvent exitEvent = (ContainerExitEvent) event;
+
+ // should use an explicit exit code for stopping?
+ container.exitCode = ContainerExitStatus.KILLED_BY_APPMASTER;
+ if (exitEvent.getDiagnosticInfo() != null) {
+ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
+ }
+
+ container.wasLaunched = false;
+ container.metrics.endRunningContainer();
+
+ // TODO: if allows restart, start monitoring container for destroy delay
+ }
+ }
+
+ /**
* Handle the following transitions:
* - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
* KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java
index 56421d9..d1e2e9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java
@@ -17,12 +17,31 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
public class ContainerInitEvent extends ContainerEvent {
+ private final boolean startAfterInit;
+ private final StartContainerRequest updatedStartRequest;
- public ContainerInitEvent(ContainerId c) {
+ public ContainerInitEvent(ContainerId c, boolean startAfterInit,
+ StartContainerRequest updatedStartRequest) {
super(c, ContainerEventType.INIT_CONTAINER);
+ this.startAfterInit = startAfterInit;
+ this.updatedStartRequest = updatedStartRequest;
}
+ public ContainerInitEvent(ContainerId c, boolean startAfterInit) {
+ super(c, ContainerEventType.INIT_CONTAINER);
+ this.startAfterInit = startAfterInit;
+ this.updatedStartRequest = null;
+ }
+
+ public boolean shouldStartAfterInit() {
+ return this.startAfterInit;
+ }
+
+ public StartContainerRequest getUpdatedStartRequest() {
+ return this.updatedStartRequest;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 6b96204..5dbca1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState {
- NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING,
- EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
+ NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, LAUNCHING, RUNNING,
+ RELAUNCHING, STOPPING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index a3b53e3..9d70393 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -178,9 +178,14 @@ public Integer call() {
String appIdStr = app.getAppId().toString();
String relativeContainerLogDir = ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr);
- containerLogDir =
- dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
- recordContainerLogDir(containerID, containerLogDir.toString());
+ String recordedContainerLogDir = container.getLogDir();
+ if(recordedContainerLogDir != null) {
+ containerLogDir = new Path(recordedContainerLogDir);
+ } else {
+ containerLogDir =
+ dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
+ recordContainerLogDir(containerID, containerLogDir.toString());
+ }
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(expandEnvironment(str, containerLogDir));
@@ -215,14 +220,26 @@ public Integer call() {
DataOutputStream tokensOutStream = null;
// Select the working directory for the container
- Path containerWorkDir =
- dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
- + Path.SEPARATOR + user + Path.SEPARATOR
- + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
- + Path.SEPARATOR + containerIdStr,
- LocalDirAllocator.SIZE_UNKNOWN, false);
- recordContainerWorkDir(containerID, containerWorkDir.toString());
-
+ String recordedWorkDir = container.getWorkDir();
+ Path containerWorkDir;
+ if(recordedWorkDir != null) {
+ containerWorkDir = new Path(recordedWorkDir);
+ // remove content of working directory for a clean start
+ // since data from a previous run might still be present
+ LOG.info("Removing working directory: " + recordedWorkDir
+ + " for container: " + containerIdStr);
+ lfs.delete(containerWorkDir, true);
+
+ // TODO: let the application master specify a policy for data removal
+ } else {
+ containerWorkDir =
+ dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+ + Path.SEPARATOR + user + Path.SEPARATOR
+ + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ + Path.SEPARATOR + containerIdStr,
+ LocalDirAllocator.SIZE_UNKNOWN, false);
+ recordContainerWorkDir(containerID, containerWorkDir.toString());
+ }
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
// pid file should be in nm private dir so that it is not
@@ -391,7 +408,8 @@ protected void setContainerCompletedStatus(int exitCode) {
completed.set(true);
exec.deactivateContainer(containerId);
try {
- if (!container.shouldRetry(exitCode)) {
+ if (!container.shouldRetry(exitCode) && !(container.allowsRestart()
+ && container.getContainerState() == ContainerState.STOPPING)) {
context.getNMStateStore().storeContainerCompleted(containerId,
exitCode);
}
@@ -518,16 +536,18 @@ protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
* @throws IOException
*/
@SuppressWarnings("unchecked") // dispatcher not typed
- public void cleanupContainer() throws IOException {
+ public void cleanupContainer(boolean storeAsCompleted) throws IOException {
ContainerId containerId = container.getContainerId();
String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr);
- try {
- context.getNMStateStore().storeContainerKilled(containerId);
- } catch (IOException e) {
- LOG.error("Unable to mark container " + containerId
- + " killed in store", e);
+ if(storeAsCompleted) {
+ try {
+ context.getNMStateStore().storeContainerKilled(containerId);
+ } catch (IOException e) {
+ LOG.error("Unable to mark container " + containerId
+ + " killed in store", e);
+ }
}
// launch flag will be set to true if process already launched
@@ -1163,7 +1183,7 @@ public static String getExitCodeFile(String pidFile) {
private void recordContainerLogDir(ContainerId containerId,
String logDir) throws IOException{
- if (container.isRetryContextSet()) {
+ if (container.isRetryContextSet() || container.allowsRestart()) {
container.setLogDir(logDir);
context.getNMStateStore().storeContainerLogDir(containerId, logDir);
}
@@ -1171,7 +1191,7 @@ private void recordContainerLogDir(ContainerId containerId,
private void recordContainerWorkDir(ContainerId containerId,
String workDir) throws IOException{
- if (container.isRetryContextSet()) {
+ if (container.isRetryContextSet() || container.allowsRestart()) {
container.setWorkDir(workDir);
context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index e5fff00..118e64f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -137,6 +137,8 @@ public void handle(ContainersLauncherEvent event) {
running.put(containerId, launch);
break;
case CLEANUP_CONTAINER:
+ CleanupContainersLauncherEvent cleanupEvent =
+ (CleanupContainersLauncherEvent) event;
ContainerLaunch launcher = running.remove(containerId);
if (launcher == null) {
// Container not launched. So nothing needs to be done.
@@ -146,7 +148,7 @@ public void handle(ContainersLauncherEvent event) {
// Cleanup a container whether it is running/killed/completed, so that
// no sub-processes are alive.
try {
- launcher.cleanupContainer();
+ launcher.cleanupContainer(cleanupEvent.getStoreAsCompleted());
} catch (IOException e) {
LOG.warn("Got exception while cleaning container " + containerId
+ ". Ignoring.");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
index 66f5a2a..3848e53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
@@ -71,6 +71,20 @@ public Integer call() {
containerId.getApplicationAttemptId().getApplicationId());
String containerIdStr = ConverterUtils.toString(containerId);
+ if (container.allowsRestart()
+ && locatePidFile(appIdStr, containerIdStr) == null) {
+ // either the container was not started or was waiting
+ // for start, in both cases we leave the container in localized state
+ // waiting for requests form the application master
+ container.setStartAfterInit(false);
+
+ // TODO: start monitoring container for destroy delay
+
+ return 0;
+ }
+
+ // TODO: recover stopping state for containers that allows restart
+
dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
ContainerEventType.CONTAINER_LAUNCHED));
@@ -97,14 +111,7 @@ public Integer call() {
notInterrupted = false;
} finally {
if (notInterrupted) {
- this.completed.set(true);
- exec.deactivateContainer(containerId);
- try {
- getContext().getNMStateStore().storeContainerCompleted(containerId,
- retCode);
- } catch (IOException e) {
- LOG.error("Unable to set exit code for container " + containerId);
- }
+ setContainerCompletedStatus(retCode);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index b2413ad..ae98e82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -112,6 +112,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -423,6 +424,9 @@ public void handle(LocalizationEvent event) {
case CLEANUP_CONTAINER_RESOURCES:
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
break;
+ case RELEASE_CONTAINER_RESOURCES:
+ handleReleaseContainerResources((ContainerLocalizationReleaseEvent)event);
+ break;
case DESTROY_APPLICATION_RESOURCES:
handleDestroyApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
@@ -516,21 +520,9 @@ private void handleCacheCleanup(LocalizationEvent event) {
private void handleCleanupContainerResources(
ContainerLocalizationCleanupEvent rsrcCleanup) {
Container c = rsrcCleanup.getContainer();
- Map> rsrcs =
- rsrcCleanup.getResources();
- for (Map.Entry> e :
- rsrcs.entrySet()) {
- LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
- c.getContainerId().getApplicationAttemptId()
- .getApplicationId());
- for (LocalResourceRequest req : e.getValue()) {
- tracker.handle(new ResourceReleaseEvent(req,
- c.getContainerId()));
- }
- }
- String locId = ConverterUtils.toString(c.getContainerId());
- localizerTracker.cleanupPrivLocalizers(locId);
-
+ handleReleaseContainerResources(
+ new ContainerLocalizationReleaseEvent(c, rsrcCleanup.getResources()));
+
// Delete the container directories
String userName = c.getUser();
String containerIDStr = c.toString();
@@ -563,7 +555,26 @@ private void handleCleanupContainerResources(
new ContainerEvent(c.getContainerId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
}
-
+
+ private void handleReleaseContainerResources(
+ ContainerLocalizationReleaseEvent rsrcRelease) {
+ Container c = rsrcRelease.getContainer();
+ Map> rsrcs =
+ rsrcRelease.getResources();
+ for (Map.Entry> e :
+ rsrcs.entrySet()) {
+ LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
+ c.getContainerId().getApplicationAttemptId()
+ .getApplicationId());
+ for (LocalResourceRequest req : e.getValue()) {
+ tracker.handle(new ResourceReleaseEvent(req,
+ c.getContainerId()));
+ }
+ }
+ String locId = ConverterUtils.toString(c.getContainerId());
+ localizerTracker.cleanupPrivLocalizers(locId);
+ }
+
private void submitDirForDeletion(String userName, Path dir) {
try {
lfs.getFileStatus(dir);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
index 4785fba..06bdc8e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
@@ -22,6 +22,7 @@
INIT_CONTAINER_RESOURCES,
CACHE_CLEANUP,
CLEANUP_CONTAINER_RESOURCES,
+ RELEASE_CONTAINER_RESOURCES,
DESTROY_APPLICATION_RESOURCES,
CONTAINER_RESOURCES_LOCALIZED,
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
index d59abda..f21d2b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
@@ -264,7 +264,7 @@ public void recordCpuUsage(
}
public void recordProcessId(String processId) {
- registry.tag(PROCESSID_INFO, processId);
+ registry.tag(PROCESSID_INFO, processId, true);
}
public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 6e9efe1..002325f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -114,6 +114,7 @@
"/remainingRetryAttempts";
private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir";
private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir";
+ private static final String CONTAINER_DESTROY_DELAY_SUFFIX = "/destroydelay";
private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
@@ -258,6 +259,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
rcs.setLogDir(asString(entry.getValue()));
+ } else if (suffix.equals(CONTAINER_DESTROY_DELAY_SUFFIX)) {
+ rcs.setDestroyDelay(Integer.parseInt(asString(entry.getValue())));
} else {
throw new IOException("Unexpected container state key: " + key);
}
@@ -404,6 +407,18 @@ public void storeContainerLogDir(ContainerId containerId,
}
@Override
+ public void storeContainerDestroyDelay(ContainerId containerId,
+ long destroyDelay) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_DESTROY_DELAY_SUFFIX;
+ try {
+ db.put(bytes(key), bytes(Long.toString(destroyDelay)));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
public void removeContainer(ContainerId containerId)
throws IOException {
if (LOG.isDebugEnabled()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 08b80e9..b08a531 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -115,6 +115,11 @@ public void storeContainerLogDir(ContainerId containerId,
}
@Override
+ public void storeContainerDestroyDelay(ContainerId containerId,
+ long destroyDelay) throws IOException {
+ }
+
+ @Override
public void removeContainer(ContainerId containerId) throws IOException {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index ccf1e70..b71537d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -76,6 +76,7 @@ public NMStateStoreService(String name) {
private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
private String workDir;
private String logDir;
+ private long destroyDelay;
public RecoveredContainerStatus getStatus() {
return status;
@@ -125,6 +126,14 @@ public void setLogDir(String logDir) {
this.logDir = logDir;
}
+ public long getDestroyDelay() {
+ return destroyDelay;
+ }
+
+ public void setDestroyDelay(long destroyDelay) {
+ this.destroyDelay = destroyDelay;
+ }
+
@Override
public String toString() {
return new StringBuffer("Status: ").append(getStatus())
@@ -136,6 +145,7 @@ public String toString() {
.append(", RemainingRetryAttempts: ").append(remainingRetryAttempts)
.append(", WorkDir: ").append(workDir)
.append(", LogDir: ").append(logDir)
+ .append(", DestroyDelay: ").append(destroyDelay)
.toString();
}
}
@@ -310,7 +320,7 @@ public abstract void removeApplication(ApplicationId appId)
*/
public abstract void storeContainer(ContainerId containerId,
StartContainerRequest startRequest) throws IOException;
-
+
/**
* Record that a container has been launched
* @param containerId the container ID
@@ -383,6 +393,15 @@ public abstract void storeContainerLogDir(
ContainerId containerId, String logDir) throws IOException;
/**
+ * Record destroy delay for a container.
+ * @param containerId the container ID
+ * @param destroyDelay the destroy delay
+ * @throws IOException
+ */
+ public abstract void storeContainerDestroyDelay(
+ ContainerId containerId, long destroyDelay) throws IOException;
+
+ /**
* Remove records corresponding to a container
* @param containerId the container ID
* @throws IOException
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
index 319f49b..b1801a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java
@@ -65,7 +65,7 @@
// aggregation to false, container log view request is forwarded to NM. NM
// does not have completed container information,but still NM serve request for
// reading container logs.
- if (container != null) {
+ if (container != null && container.getLogDir() == null) {
checkState(container.getContainerState());
}
@@ -95,7 +95,7 @@ public static File getContainerLogFile(ContainerId containerId,
Application application = getApplicationForContainer(containerId, context);
checkAccess(remoteUser, application, context);
- if (container != null) {
+ if (container != null && container.getLogDir() == null) {
checkState(container.getContainerState());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index 3ff04d8..38325bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -102,7 +102,6 @@ public void handle(LocalizationEvent event) {
case CLEANUP_CONTAINER_RESOURCES:
Container container =
((ContainerLocalizationEvent) event).getContainer();
- // TODO: delete the container dir
this.dispatcher.getEventHandler().handle(
new ContainerEvent(container.getContainerId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
@@ -191,14 +190,6 @@ public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
}
@Override
- protected void authorizeStartAndResourceIncreaseRequest(
- NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier,
- boolean startRequest) throws YarnException {
- // do nothing
- }
-
- @Override
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
// do nothing
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 370a207..c8c5b56 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -567,10 +567,12 @@ public void initApplication() {
public void initContainer(int containerNum) {
if (containerNum == -1) {
for (int i = 0; i < containers.size(); i++) {
- app.handle(new ApplicationContainerInitEvent(containers.get(i)));
+ app.handle(
+ new ApplicationContainerInitEvent(containers.get(i), true));
}
} else {
- app.handle(new ApplicationContainerInitEvent(containers.get(containerNum)));
+ app.handle(new ApplicationContainerInitEvent(
+ containers.get(containerNum), true));
}
drainDispatcherEvents();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 4652245..7e2e87d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -118,6 +119,7 @@ public synchronized void removeApplication(ApplicationId appId)
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
+ rcsCopy.setDestroyDelay(rcs.getDestroyDelay());
result.add(rcsCopy);
}
return result;
@@ -192,6 +194,13 @@ public void storeContainerLogDir(ContainerId containerId,
}
@Override
+ public void storeContainerDestroyDelay(ContainerId containerId,
+ long destroyDelay) throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.setDestroyDelay(destroyDelay);
+ }
+
+ @Override
public synchronized void removeContainer(ContainerId containerId)
throws IOException {
containerStates.remove(containerId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index ccc9254..cc673fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -346,6 +346,14 @@ public void testContainerStorage() throws IOException {
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
+ // store destroy delay
+ stateStore.storeContainerDestroyDelay(containerId, 32000);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ rcs = recoveredContainers.get(0);
+ assertEquals(32000, rcs.getDestroyDelay());
+
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 0b95dba..214ca92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -172,4 +172,28 @@ public String getLogDir() {
@Override
public void setLogDir(String logDir) {
}
+
+ @Override
+ public boolean shouldStartAfterInit() {
+ return false;
+ }
+
+ @Override
+ public void setStartAfterInit(boolean startAfterInit) {
+ }
+
+ @Override
+ public long getDestroyDelay() {
+ return 0;
+ }
+
+ @Override
+ public boolean allowsRestart() {
+ return false;
+ }
+
+ @Override
+ public boolean isAwaitingStart() {
+ return false;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
index b4ebf92..7b0e872 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
@@ -27,12 +27,16 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -304,6 +308,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(
return null;
}
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
public static org.apache.hadoop.yarn.server.api.records.NodeStatus
createNodeStatus(NodeId nodeId, List containers) {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
index 2787f1e..92d29dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
@@ -42,6 +42,10 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -130,6 +134,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(IncreaseCon
return IncreaseContainersResourceResponse.newInstance(null, null);
}
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ return InitializeContainersResponse.newInstance(null, null, null);
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ return DestroyContainersResponse.newInstance(null, null);
+ }
+
public Credentials getContainerCredentials() throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 8cdb191..e3d9da3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -33,8 +33,12 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -145,6 +149,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(
throws YarnException {
return null;
}
+
+ @Override
+ public InitializeContainersResponse initializeContainers(
+ InitializeContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public DestroyContainersResponse destroyContainers(
+ DestroyContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
@Test