diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 6c3a4d6..cc0dd2b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -56,6 +58,8 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -73,6 +77,7 @@ import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; @@ -460,5 +465,21 @@ public IncreaseContainersResourceResponse increaseContainersResource( "Dummy function cause")); throw new IOException(e); } + + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + Exception e = new Exception("Dummy function", new Exception( + "Dummy function cause")); + throw new IOException(e); + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + Exception e = new Exception("Dummy function", new Exception( + "Dummy function cause")); + throw new IOException(e); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 610448c..2183d38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -48,6 +48,10 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -463,6 +467,18 @@ public IncreaseContainersResourceResponse increaseContainersResource( } @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + return null; + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + return null; + } + + @Override public void close() throws IOException { } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 43e1d4c..1362920 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -25,6 +25,11 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -194,4 +199,87 @@ GetContainerStatusesResponse getContainerStatuses( IncreaseContainersResourceResponse increaseContainersResource( IncreaseContainersResourceRequest request) throws YarnException, IOException; + + /** + *

+ * The ApplicationMaster provides a list of + * {@link InitializeContainerRequest}s to a NodeManager to + * initialize {@link Container}s allocated to it using this + * interface. + *

+ * + *

+ * The ApplicationMaster has to provide details such as allocated + * resource capability, security tokens (if enabled), command to be executed + * to initialize the container, environment for the process, necessary + * binaries/jar/shared-objects etc. via the {@link ContainerLaunchContext} in + * the {@link InitializeContainerRequest}. + *

+ * + *

+ * The NodeManager sends a response via + * {@link InitializeContainersResponse} which includes a list of + * {@link Container}s of successfully initialized {@link Container}s, a + * containerId-to-exception map for each failed + * {@link InitializeContainerRequest} in which the exception indicates errors + * from per container and a allServicesMetaData map between the names of + * auxiliary services and their corresponding meta-data. Note: + * None-container-specific exceptions will still be thrown by the API method + * itself. + *

+ *

+ * The ApplicationMaster can use + * {@link #getContainerStatuses(GetContainerStatusesRequest)} to get updated + * statuses of the to-be-initialized or initialized containers. + *

+ * + * @param request request to initialize a list of containers + * @return response including conatinerIds of all successfully initialized + * containers, a containerId-to-exception map for failed requests and + * a allServicesMetaData map. + * @throws YarnException + * @throws IOException + * @throws NMNotYetReadyException This exception is thrown when NM starts from + * scratch but has not yet connected with RM. + */ + @Public + @Unstable + InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException; + + /** + *

+ * The ApplicationMaster requests a NodeManager to + * destroy a list of {@link Container}s allocated to it using this + * interface. + *

+ * + *

+ * The ApplicationMaster sends a {@link DestroyContainersRequest} + * which includes the {@link ContainerId}s of the containers to be destroyed. + *

+ * + *

+ * The NodeManager sends a response via + * {@link DestroyContainersResponse} which includes a list of + * {@link ContainerId} s of successfully destroyed containers, a + * containerId-to-exception map for each failed request in which the exception + * indicates errors from per container. Note: None-container-specific + * exceptions will still be thrown by the API method itself. + * ApplicationMaster can use + * {@link #getContainerStatuses(GetContainerStatusesRequest)} to get updated + * statuses of the containers. + *

+ * + * @param request request to destroy a list of containers + * @return response which includes a list of containerIds of successfully + * destroyed containers, a containerId-to-exception map for failed + * requests. + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + DestroyContainersResponse destroyContainers(DestroyContainersRequest request) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java index f88fa3b..9920e73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java @@ -58,7 +58,7 @@ public static final int KILLED_EXCEEDED_PMEM = -104; /** - * Container was terminated by stop request by the app master. + * Container was terminated by stop/destroy request by the app master. */ public static final int KILLED_BY_APPMASTER = -105; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java index 582389f..65e8983 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java @@ -37,5 +37,8 @@ COMPLETE, /** Queued at the NM. */ - QUEUED + QUEUED, + + /** Waiting to be started */ + AWAITING_START } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a4213ce..aa20f0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -854,6 +854,15 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "container-diagnostics-maximum-size"; public static final int DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE = 10000; + /** + * Default time to wait before destroying a container after its execution is + * completed. + **/ + public static final String NM_CONTAINER_DESTROY_DELAY_MS = + NM_PREFIX + "container-destroy-delay-ms"; + public static final int DEFAULT_NM_CONTAINER_DESTROY_DELAY_MS = + 10 * 60 * 1000; + /** Interval at which the delayed token removal thread runs */ public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index f06f6cb..5bd41e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -35,4 +35,6 @@ service ContainerManagementProtocolService { rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto); + rpc initializeContainers(InitializeContainersRequestProto) returns (InitializeContainersResponseProto); + rpc destroyContainers(DestroyContainersRequestProto) returns (DestroyContainersResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 60cdfd1..e5c193d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -83,6 +83,7 @@ enum ContainerStateProto { C_RUNNING = 2; C_COMPLETE = 3; C_QUEUED = 4; + C_AWAITING_START = 5; } message ContainerProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index bdf022f..51200ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -262,6 +262,11 @@ message StopContainerRequestProto { message StopContainerResponseProto { } +message InitializeContainerRequestProto { + optional ContainerLaunchContextProto container_launch_context = 1; + optional hadoop.common.TokenProto container_token = 2; + optional int64 destroyDelay = 3; +} //// bulk API records message StartContainersRequestProto { @@ -306,6 +311,25 @@ message IncreaseContainersResourceResponseProto { repeated ContainerExceptionMapProto failed_requests = 2; } +message InitializeContainersRequestProto { + repeated InitializeContainerRequestProto initialize_container_request = 1; +} + +message InitializeContainersResponseProto { + repeated StringBytesMapProto services_meta_data = 1; + repeated ContainerIdProto succeeded_requests = 2; + repeated ContainerExceptionMapProto failed_requests = 3; +} + +message DestroyContainersRequestProto { + repeated ContainerIdProto container_id = 1; +} + +message DestroyContainersResponseProto { + repeated ContainerIdProto succeeded_requests = 1; + repeated ContainerExceptionMapProto failed_requests = 2; +} + ////////////////////////////////////////////////////// /////// Application_History_Protocol ///////////////// ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 47270f5..53e76e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -165,4 +165,36 @@ public NMTokenCache getNMTokenCache() { return nmTokenCache; } + /** + *

Initialize an allocated container.

+ * + *

The ApplicationMaster or other applications that use the + * client must provide the details of the allocated container, including the + * Id, the assigned node's Id and the token via {@link Container}. In + * addition, the AM needs to provide the {@link ContainerLaunchContext} as + * well.

+ * + * @param container the allocated container + * @param containerLaunchContext the context information needed by the + * NodeManager to initialize the + * container + * @return a map between the auxiliary service names and their outputs + * @throws YarnException + * @throws IOException + */ + public abstract Map initializeContainer(Container container, + ContainerLaunchContext containerLaunchContext) + throws YarnException, IOException; + + /** + *

Destroy a started container.

+ * + * @param containerId the Id of the container to destroy + * @param nodeId the Id of the NodeManager + * + * @throws YarnException + * @throws IOException + */ + public abstract void destroyContainer(ContainerId containerId, NodeId nodeId) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index dc92cda..10820ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -33,10 +33,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -115,7 +120,7 @@ protected void serviceStop() throws Exception { protected synchronized void cleanupRunningContainers() { for (StartedContainer startedContainer : startedContainers.values()) { try { - stopContainer(startedContainer.getContainerId(), + destroyContainer(startedContainer.getContainerId(), startedContainer.getNodeId()); } catch (YarnException e) { LOG.error("Failed to stop Container " + @@ -147,12 +152,14 @@ public void cleanupRunningContainersOnStop(boolean enabled) { private ContainerId containerId; private NodeId nodeId; private ContainerState state; + private boolean multiStart; - - public StartedContainer(ContainerId containerId, NodeId nodeId) { + public StartedContainer(ContainerId containerId, NodeId nodeId, + boolean multiStart) { this.containerId = containerId; this.nodeId = nodeId; state = ContainerState.NEW; + this.multiStart = multiStart; } public ContainerId getContainerId() { @@ -162,15 +169,26 @@ public ContainerId getContainerId() { public NodeId getNodeId() { return nodeId; } + + public boolean allowsRestart() { + return multiStart; + } } - private void addStartingContainer(StartedContainer startedContainer) + private StartedContainer addStartingContainer(StartedContainer startedContainer) throws YarnException { - if (startedContainers.putIfAbsent(startedContainer.containerId, - startedContainer) != null) { - throw RPCUtil.getRemoteException("Container " - + startedContainer.containerId.toString() + " is already started"); + StartedContainer previousStartedContainer = startedContainers + .putIfAbsent(startedContainer.containerId, startedContainer); + + if (previousStartedContainer != null) { + if(!previousStartedContainer.allowsRestart()) { + throw RPCUtil.getRemoteException("Container " + + startedContainer.containerId.toString() + " is already started"); + } + return previousStartedContainer; } + + return startedContainer; } @Override @@ -181,49 +199,58 @@ private void addStartingContainer(StartedContainer startedContainer) // between startContainer and stopContainer only when startContainer is // in progress for a given container. StartedContainer startingContainer = - new StartedContainer(container.getId(), container.getNodeId()); + new StartedContainer(container.getId(), container.getNodeId(), false); synchronized (startingContainer) { - addStartingContainer(startingContainer); - - Map allServiceResponse; - ContainerManagementProtocolProxyData proxy = null; - try { - proxy = - cmProxy.getProxy(container.getNodeId().toString(), - container.getId()); - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - container.getContainerToken()); - List list = new ArrayList(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - StartContainersResponse response = - proxy - .getContainerManagementProtocol().startContainers(allRequests); - if (response.getFailedRequests() != null - && response.getFailedRequests().containsKey(container.getId())) { - Throwable t = - response.getFailedRequests().get(container.getId()).deSerialize(); - parseAndThrowException(t); - } - allServiceResponse = response.getAllServicesMetaData(); - startingContainer.state = ContainerState.RUNNING; - } catch (YarnException | IOException e) { - startingContainer.state = ContainerState.COMPLETE; - // Remove the started container if it failed to start - startedContainers.remove(startingContainer.containerId); - throw e; - } catch (Throwable t) { - startingContainer.state = ContainerState.COMPLETE; - startedContainers.remove(startingContainer.containerId); - throw RPCUtil.getRemoteException(t); - } finally { - if (proxy != null) { - cmProxy.mayBeCloseProxy(proxy); + startingContainer = addStartingContainer(startingContainer); + // Do synchronization on the actual StartedContainer stored in + // startedContainers map to prevent race condition between startContainer + // and initializeContainer when multiple container start/init are allowed + synchronized (startingContainer) { + + Map allServiceResponse; + ContainerManagementProtocolProxyData proxy = null; + try { + proxy = + cmProxy.getProxy(container.getNodeId().toString(), + container.getId()); + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + container.getContainerToken()); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + StartContainersResponse response = + proxy + .getContainerManagementProtocol().startContainers(allRequests); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(container.getId())) { + Throwable t = + response.getFailedRequests().get(container.getId()).deSerialize(); + parseAndThrowException(t); + } + allServiceResponse = response.getAllServicesMetaData(); + + startingContainer.state = ContainerState.RUNNING; + } catch (YarnException | IOException e) { + if(!startingContainer.allowsRestart()) { + startingContainer.state = ContainerState.COMPLETE; + // Remove the started container if it failed to start + startedContainers.remove(startingContainer.containerId); + } + // TODO better handling of exceptions when multiStart is enabled + throw e; + } catch (Throwable t) { + startingContainer.state = ContainerState.COMPLETE; + startedContainers.remove(startingContainer.containerId); + throw RPCUtil.getRemoteException(t); + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } } + return allServiceResponse; } - return allServiceResponse; } } @@ -269,9 +296,12 @@ public void stopContainer(ContainerId containerId, NodeId nodeId) return; } stopContainerInternal(containerId, nodeId); - // Only after successful - startedContainer.state = ContainerState.COMPLETE; - startedContainers.remove(startedContainer.containerId); + if(startedContainer.allowsRestart()) { + startedContainer.state = ContainerState.AWAITING_START; + } else { + startedContainer.state = ContainerState.COMPLETE; + startedContainers.remove(startedContainer.containerId); + } } } else { stopContainerInternal(containerId, nodeId); @@ -306,6 +336,89 @@ public ContainerStatus getContainerStatus(ContainerId containerId, } } + @Override + public Map initializeContainer(Container container, + ContainerLaunchContext containerLaunchContext) + throws YarnException, IOException { + + StartedContainer startingContainer = + new StartedContainer(container.getId(), container.getNodeId(), true); + synchronized (startingContainer) { + startingContainer = addStartingContainer(startingContainer); + // Do synchronization on the actual StartedContainer stored in + // startedContainers map to prevent race condition between startContainer + // and initializeContainer + synchronized (startingContainer) { + + if (!startingContainer.allowsRestart()) { + throw RPCUtil.getRemoteException( + "Container " + startingContainer.containerId.toString() + + " already initialized and does not allow reinitialization"); + } + + Map allServiceResponse; + ContainerManagementProtocolProxyData proxy = null; + try { + proxy = cmProxy.getProxy(container.getNodeId().toString(), + container.getId()); + InitializeContainerRequest icRequest = + InitializeContainerRequest.newInstance(containerLaunchContext, + container.getContainerToken()); + List list = + new ArrayList(); + list.add(icRequest); + InitializeContainersRequest allRequests = + InitializeContainersRequest.newInstance(list); + InitializeContainersResponse response = + proxy.getContainerManagementProtocol() + .initializeContainers(allRequests); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(container.getId())) { + Throwable t = response.getFailedRequests().get(container.getId()) + .deSerialize(); + parseAndThrowException(t); + } + allServiceResponse = response.getAllServicesMetaData(); + + startingContainer.state = ContainerState.AWAITING_START; + } catch (YarnException | IOException e) { + startingContainer.state = ContainerState.COMPLETE; + // Remove the started container if it failed to initialize + // TODO check if this behavior is correct + startedContainers.remove(startingContainer.containerId); + throw e; + } catch (Throwable t) { + startingContainer.state = ContainerState.COMPLETE; + startedContainers.remove(startingContainer.containerId); + throw RPCUtil.getRemoteException(t); + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + return allServiceResponse; + } + } + } + + @Override + public void destroyContainer(ContainerId containerId, NodeId nodeId) + throws YarnException, IOException { + StartedContainer startedContainer = startedContainers.get(containerId); + + // Only allow one request of destroy container to move forward + // When entering the block + if (startedContainer != null) { + synchronized (startedContainer) { + destroyContainerInternal(containerId, nodeId); + startedContainer.state = ContainerState.COMPLETE; + startedContainers.remove(startedContainer.containerId); + } + } else { + destroyContainerInternal(containerId, nodeId); + } + } + private void stopContainerInternal(ContainerId containerId, NodeId nodeId) throws IOException, YarnException { ContainerManagementProtocolProxyData proxy = null; @@ -329,6 +442,29 @@ private void stopContainerInternal(ContainerId containerId, NodeId nodeId) } } + private void destroyContainerInternal(ContainerId containerId, NodeId nodeId) + throws IOException, YarnException { + ContainerManagementProtocolProxyData proxy = null; + List containerIds = new ArrayList(); + containerIds.add(containerId); + try { + proxy = cmProxy.getProxy(nodeId.toString(), containerId); + DestroyContainersResponse response = + proxy.getContainerManagementProtocol().destroyContainers( + DestroyContainersRequest.newInstance(containerIds)); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(containerId)) { + Throwable t = + response.getFailedRequests().get(containerId).deSerialize(); + parseAndThrowException(t); + } + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + } + public AtomicBoolean getCleanupRunningContainers() { return cleanupRunningContainers; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index ce18bde..e4975b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -32,6 +32,10 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -40,6 +44,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -49,10 +57,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.DestroyContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.InitializeContainersRequestProto; import com.google.protobuf.ServiceException; @@ -148,4 +158,33 @@ public IncreaseContainersResourceResponse increaseContainersResource( return null; } } + + @Override + public InitializeContainersResponse + initializeContainers(InitializeContainersRequest requests) throws YarnException, + IOException { + InitializeContainersRequestProto requestProto = + ((InitializeContainersRequestPBImpl) requests).getProto(); + try { + return new InitializeContainersResponsePBImpl(proxy.initializeContainers(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public DestroyContainersResponse destroyContainers(DestroyContainersRequest requests) + throws YarnException, IOException { + DestroyContainersRequestProto requestProto = + ((DestroyContainersRequestPBImpl) requests).getProto(); + try { + return new DestroyContainersResponsePBImpl(proxy.destroyContainers(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 7626441..c5e6888 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -24,11 +24,17 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.InitializeContainersResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.DestroyContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -38,6 +44,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.InitializeContainersRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.InitializeContainersResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.DestroyContainersRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.DestroyContainersResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; @@ -116,4 +126,32 @@ public IncreaseContainersResourceResponseProto increaseContainersResource( throw new ServiceException(e); } } + + @Override + public InitializeContainersResponseProto initializeContainers(RpcController arg0, + InitializeContainersRequestProto proto) throws ServiceException { + InitializeContainersRequestPBImpl request = new InitializeContainersRequestPBImpl(proto); + try { + InitializeContainersResponse response = real.initializeContainers(request); + return ((InitializeContainersResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public DestroyContainersResponseProto destroyContainers(RpcController arg0, + DestroyContainersRequestProto proto) throws ServiceException { + DestroyContainersRequestPBImpl request = new DestroyContainersRequestPBImpl(proto); + try { + DestroyContainersResponse response = real.destroyContainers(request); + return ((DestroyContainersResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 0a19783..1660f17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -33,6 +33,10 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -174,5 +178,17 @@ public IncreaseContainersResourceResponse increaseContainersResource( IncreaseContainersResourceRequest request) throws YarnException, IOException { return null; } + + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + return null; + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java index 50ff1e0..4385275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java @@ -25,10 +25,14 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; @@ -158,5 +162,17 @@ public IncreaseContainersResourceResponse increaseContainersResource( } throw new YarnException("Shouldn't happen!!"); } + + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + return null; + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index e718661..f589fe1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -35,6 +35,10 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -227,6 +231,18 @@ public IncreaseContainersResourceResponse increaseContainersResource( IncreaseContainersResourceRequest request) throws YarnException, IOException { return null; } + + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + return null; + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java index cb4021f..67249e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMAuditLogger.java @@ -43,8 +43,10 @@ static final char PAIR_SEPARATOR = '\t'; // Some commonly used descriptions + public static final String INITIALIZE_CONTAINER = "Initialize Container Request"; public static final String START_CONTAINER = "Start Container Request"; public static final String STOP_CONTAINER = "Stop Container Request"; + public static final String DESTROY_CONTAINER = "Destroy Container Request"; public static final String FINISH_SUCCESS_CONTAINER = "Container Finished - Succeeded"; public static final String FINISH_FAILED_CONTAINER = "Container Finished - Failed"; public static final String FINISH_KILLED_CONTAINER = "Container Finished - Killed"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index c0f02e9..0cb55ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -527,6 +527,11 @@ private ResourceUtilization getNodeUtilization() { // subsequent call to stop container it will get removed from cache. addCompletedContainer(containerId); } else { + + // notify containers in AWAITING_START as running to resource manager + if(containerStatus.getState() == ContainerState.AWAITING_START) { + containerStatus.setState(ContainerState.RUNNING); + } containerStatuses.add(containerStatus); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 162823c..d8c60aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -56,10 +56,15 @@ import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -120,7 +125,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerStartEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerStopEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; @@ -357,7 +365,12 @@ private void recoverContainer(RecoveredContainerState rcs) credentials, metrics, token, context, rcs); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( - new ApplicationContainerInitEvent(container)); + // For containers that do not allow multi start, the startAfterInit + // flag has no effect during recovering. However, we set startAfterInit + // to true in order the avoid the application master to restart + // or reinitialize the container until we check if the container is + // already running + new ApplicationContainerInitEvent(container, true)); } else { if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { LOG.warn(containerId + " has no corresponding application!"); @@ -704,7 +717,7 @@ protected void authorizeUser(UserGroupInformation remoteUgi, protected void authorizeStartAndResourceIncreaseRequest( NMTokenIdentifier nmTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier, - boolean startRequest) + boolean startRequest, boolean isRestart) throws YarnException { if (nmTokenIdentifier == null) { throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG); @@ -734,8 +747,9 @@ protected void authorizeStartAndResourceIncreaseRequest( + "container with container token") .append(" issued for application attempt : ") .append(containerId.getApplicationAttemptId()); - } else if (startRequest && !this.context.getContainerTokenSecretManager() - .isValidStartContainerRequest(containerTokenIdentifier)) { + } else if (startRequest && !isRestart + && !this.context.getContainerTokenSecretManager() + .isValidStartContainerRequest(containerTokenIdentifier)) { // Is the container being relaunched? Or RPC layer let startCall with // tokens generated off old-secret through? unauthorized = true; @@ -803,16 +817,28 @@ public StartContainersResponse startContainers( containerTokenIdentifier); containerId = containerTokenIdentifier.getContainerID(); + boolean isRestart = false; + Container containerFromContext = this.context.getContainers().get(containerId); + if(containerFromContext != null) { + isRestart = containerFromContext.allowsRestart(); + } + // Initialize the AMRMProxy service instance only if the container is of // type AM and if the AMRMProxy service is enabled - if (amrmProxyEnabled && containerTokenIdentifier.getContainerType() + if (!isRestart && amrmProxyEnabled && containerTokenIdentifier.getContainerType() .equals(ContainerType.APPLICATION_MASTER)) { this.getAMRMProxyService().processApplicationStartRequest(request); } - performContainerPreStartChecks(nmTokenIdentifier, request, - containerTokenIdentifier); - startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, - request); + performContainerPreStartChecks(nmTokenIdentifier, + request.getContainerLaunchContext(), containerTokenIdentifier, + isRestart); + if(isRestart) { + restartContainerInternal(nmTokenIdentifier, containerTokenIdentifier, + request, containerFromContext); + } else { + startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, + request); + } succeededContainers.add(containerId); } catch (YarnException e) { failedContainers.put(containerId, SerializedException.newInstance(e)); @@ -831,8 +857,8 @@ public StartContainersResponse startContainers( } private void performContainerPreStartChecks( - NMTokenIdentifier nmTokenIdentifier, StartContainerRequest request, - ContainerTokenIdentifier containerTokenIdentifier) + NMTokenIdentifier nmTokenIdentifier, ContainerLaunchContext launchContext, + ContainerTokenIdentifier containerTokenIdentifier, boolean isRestart) throws YarnException, InvalidToken { /* * 1) It should save the NMToken into NMTokenSecretManager. This is done @@ -847,25 +873,55 @@ private void performContainerPreStartChecks( * correct RMIdentifier. d) It is not expired. */ authorizeStartAndResourceIncreaseRequest( - nmTokenIdentifier, containerTokenIdentifier, true); + nmTokenIdentifier, containerTokenIdentifier, true, isRestart); // update NMToken updateNMTokenIdentifier(nmTokenIdentifier); - ContainerLaunchContext launchContext = request.getContainerLaunchContext(); - - Map serviceData = getAuxServiceMetaData(); - if (launchContext.getServiceData()!=null && - !launchContext.getServiceData().isEmpty()) { - for (Entry meta : launchContext.getServiceData() - .entrySet()) { - if (null == serviceData.get(meta.getKey())) { - throw new InvalidAuxServiceException("The auxService:" + meta.getKey() - + " does not exist"); + // Launch context could be null in case of a restart without reinitialization + if(launchContext != null || !isRestart) { + Map serviceData = getAuxServiceMetaData(); + if (launchContext.getServiceData()!=null && + !launchContext.getServiceData().isEmpty()) { + for (Entry meta : launchContext.getServiceData() + .entrySet()) { + if (null == serviceData.get(meta.getKey())) { + throw new InvalidAuxServiceException("The auxService:" + meta.getKey() + + " does not exist"); + } } } } } + private long verifyAndGetDestroyDelay( + InitializeContainerRequest request, + ContainerTokenIdentifier containerTokenIdentifier) + throws YarnException { + + ContainerId containerId = containerTokenIdentifier.getContainerID(); + String containerIdStr = containerId.toString(); + String user = containerTokenIdentifier.getApplicationSubmitter(); + + ApplicationId applicationID = + containerId.getApplicationAttemptId().getApplicationId(); + + long destroyDelay = request.getDestroyDelay(); + + if (destroyDelay < 0) { + NMAuditLogger.logFailure(user, AuditConstants.INITIALIZE_CONTAINER, + "ContainerManagerImpl", "Invalid destroy delay value", applicationID, + containerId); + throw RPCUtil.getRemoteException( + "Container " + containerIdStr + " invalid destroy delay value"); + } else if (destroyDelay == 0) { + destroyDelay = this.getConfig().getLong( + YarnConfiguration.NM_CONTAINER_DESTROY_DELAY_MS, + YarnConfiguration.DEFAULT_NM_CONTAINER_DESTROY_DELAY_MS); + } + + return destroyDelay; + } + private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, String user, Credentials credentials, Map appAcls, @@ -961,7 +1017,7 @@ protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, this.context.getNMStateStore().storeContainer(containerId, request); dispatcher.getEventHandler().handle( - new ApplicationContainerInitEvent(container)); + new ApplicationContainerInitEvent(container, true)); this.context.getContainerTokenSecretManager().startContainerSuccessful( containerTokenIdentifier); @@ -981,6 +1037,171 @@ protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, } } + @SuppressWarnings("unchecked") + protected void reinitializeContainerInternal( + NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + InitializeContainerRequest request, Container previousContainer) + throws YarnException, IOException { + + ContainerId containerId = containerTokenIdentifier.getContainerID(); + String containerIdStr = containerId.toString(); + String user = containerTokenIdentifier.getApplicationSubmitter(); + + ApplicationId applicationID = + containerId.getApplicationAttemptId().getApplicationId(); + + LOG.info("Initialize request for " + containerIdStr + " by user " + user); + LOG.info("Reinitializing container " + containerIdStr); + + if(!previousContainer.allowsRestart()) { + NMAuditLogger.logFailure(user, AuditConstants.INITIALIZE_CONTAINER, + "ContainerManagerImpl", "Container does not allow reinitialization!", + applicationID, containerId); + throw RPCUtil.getRemoteException("Container " + containerIdStr + + " does not allow reinitialization!"); + } + if(!previousContainer.isAwaitingStart()) { + NMAuditLogger.logFailure(user, AuditConstants.INITIALIZE_CONTAINER, + "ContainerManagerImpl", "Container is not awaiting for reinitialization", + applicationID, containerId); + throw RPCUtil.getRemoteException("Container " + containerIdStr + + " is not awaiting for reinitialization"); + } + + StartContainerRequest startRequest = StartContainerRequest.newInstance( + request.getContainerLaunchContext(), request.getContainerToken()); + + dispatcher.getEventHandler() + .handle(new ContainerInitEvent(containerId, false, startRequest)); + + NMAuditLogger.logSuccess(user, AuditConstants.INITIALIZE_CONTAINER, + "ContainerManageImpl", applicationID, containerId); + } + + @SuppressWarnings("unchecked") + protected void initializeContainerInternal(NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + InitializeContainerRequest request) throws YarnException, IOException { + + ContainerId containerId = containerTokenIdentifier.getContainerID(); + String containerIdStr = containerId.toString(); + String user = containerTokenIdentifier.getApplicationSubmitter(); + + ContainerLaunchContext launchContext = request.getContainerLaunchContext(); + + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(launchContext); + + ApplicationId applicationID = + containerId.getApplicationAttemptId().getApplicationId(); + + LOG.info("Initialize request for " + containerIdStr + " by user " + user); + LOG.info("Initializing new container " + containerIdStr); + + Container container = new ContainerImpl(getConfig(), this.dispatcher, + launchContext, credentials, metrics, containerTokenIdentifier, context, + verifyAndGetDestroyDelay(request, containerTokenIdentifier)); + + // we already verified that the container is not present, coordination for + // new container insertion is guaranteed by lock on this.context + context.getContainers().put(containerId, container); + + this.readLock.lock(); + try { + if (!isServiceStopped()) { + // Create the application + Application application = new ApplicationImpl(dispatcher, user, + applicationID, credentials, context); + if (null == context.getApplications().putIfAbsent(applicationID, + application)) { + LOG.info("Creating a new application reference for app " + + applicationID); + LogAggregationContext logAggregationContext = + containerTokenIdentifier.getLogAggregationContext(); + Map appAcls = + container.getLaunchContext().getApplicationACLs(); + context.getNMStateStore().storeApplication(applicationID, + buildAppProto(applicationID, user, credentials, appAcls, + logAggregationContext)); + dispatcher.getEventHandler().handle( + new ApplicationInitEvent(applicationID, appAcls, + logAggregationContext)); + } + + StartContainerRequest startRequest = StartContainerRequest + .newInstance(request.getContainerLaunchContext(), + request.getContainerToken()); + + // TODO: make a single call to the state store + this.context.getNMStateStore().storeContainerDestroyDelay(containerId, + request.getDestroyDelay()); + this.context.getNMStateStore().storeContainer(containerId, startRequest); + dispatcher.getEventHandler().handle( + new ApplicationContainerInitEvent(container, false)); + + if (amrmProxyEnabled && containerTokenIdentifier.getContainerType() + .equals(ContainerType.APPLICATION_MASTER)) { + this.getAMRMProxyService().processApplicationStartRequest(startRequest); + } + + this.context.getContainerTokenSecretManager().startContainerSuccessful( + containerTokenIdentifier); + NMAuditLogger.logSuccess(user, AuditConstants.INITIALIZE_CONTAINER, + "ContainerManageImpl", applicationID, containerId); + + } else { + throw new YarnException( + "Container initialization failed as the NodeManager is " + + "in the process of shutting down"); + } + } finally { + this.readLock.unlock(); + } + + } + + @SuppressWarnings("unchecked") + protected void restartContainerInternal(NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + StartContainerRequest request, Container container) + throws YarnException, IOException { + + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + String user = containerTokenIdentifier.getApplicationSubmitter(); + + ApplicationId applicationID = + containerId.getApplicationAttemptId().getApplicationId(); + + LOG.info("Start request for " + containerIdStr + " by user " + user); + + if(!container.isAwaitingStart()) { + NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER, + "ContainerManagerImpl", "Container is not awaiting for start", + applicationID, containerId); + throw RPCUtil.getRemoteException("Container " + containerIdStr + + " is not awaiting for start"); + } + + if(request.getContainerLaunchContext() == null) { + LOG.info("Restarting container " + containerIdStr); + dispatcher.getEventHandler().handle(new ContainerStartEvent(containerId)); + } else { + LOG.info("Reinitializing and starting container " + containerIdStr); + dispatcher.getEventHandler() + .handle(new ContainerInitEvent(containerId, true, request)); + } + + + NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER, + "ContainerManageImpl", applicationID, containerId); + // TODO launchedContainer misplaced -> doesn't necessarily mean a container + // launch. A finished Application will not launch containers. + metrics.launchedContainer(); + metrics.allocateContainer(containerTokenIdentifier.getResource()); + } + protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( org.apache.hadoop.yarn.api.records.Token token, ContainerTokenIdentifier containerTokenIdentifier) throws YarnException, @@ -1036,7 +1257,7 @@ public IncreaseContainersResourceResponse increaseContainersResource( verifyAndGetContainerTokenIdentifier(token, containerTokenIdentifier); authorizeStartAndResourceIncreaseRequest( - nmTokenIdentifier, containerTokenIdentifier, false); + nmTokenIdentifier, containerTokenIdentifier, false, false); containerId = containerTokenIdentifier.getContainerID(); // Reuse the startContainer logic to update NMToken, // as container resource increase request will have come with @@ -1166,7 +1387,11 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) try { Container container = this.context.getContainers().get(id); authorizeGetAndStopContainerRequest(id, container, true, identifier); - stopContainerInternal(id); + if(container.allowsRestart()) { + stopContainerInternal(id); + } else { + destroyContainerInternal(id); + } succeededRequests.add(id); } catch (YarnException e) { failedRequests.put(id, SerializedException.newInstance(e)); @@ -1189,6 +1414,62 @@ protected void stopContainerInternal(ContainerId containerID) + " is not handled by this NodeManager"); } } else { + + // TODO: check container state before sending stop to avoid invalid + // event exceptions in state machine + + dispatcher.getEventHandler() + .handle(new ContainerStopEvent(containerID)); + + NMAuditLogger.logSuccess(container.getUser(), + AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID + .getApplicationAttemptId().getApplicationId(), containerID); + } + } + + /** + * Destroy a list of containers running on this NodeManager. + */ + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest requests) throws YarnException, IOException { + + List succeededRequests = new ArrayList(); + Map failedRequests = + new HashMap(); + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi); + if (identifier == null) { + throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG); + } + for (ContainerId id : requests.getContainerIds()) { + try { + Container container = this.context.getContainers().get(id); + authorizeGetAndStopContainerRequest(id, container, true, identifier); + destroyContainerInternal(id); + succeededRequests.add(id); + } catch (YarnException e) { + failedRequests.put(id, SerializedException.newInstance(e)); + } + } + return DestroyContainersResponse + .newInstance(succeededRequests, failedRequests); + } + + @SuppressWarnings("unchecked") + protected void destroyContainerInternal(ContainerId containerID) + throws YarnException, IOException { + String containerIDStr = containerID.toString(); + Container container = this.context.getContainers().get(containerID); + LOG.info("Destroying container with container Id: " + containerIDStr); + + if (container == null) { + if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } + } else { + context.getNMStateStore().storeContainerKilled(containerID); dispatcher.getEventHandler().handle( new ContainerKillEvent(containerID, @@ -1196,7 +1477,7 @@ protected void stopContainerInternal(ContainerId containerID) "Container killed by the ApplicationMaster.")); NMAuditLogger.logSuccess(container.getUser(), - AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID + AuditConstants.DESTROY_CONTAINER, "ContainerManageImpl", containerID .getApplicationAttemptId().getApplicationId(), containerID); } } @@ -1286,6 +1567,75 @@ protected void authorizeGetAndStopContainerRequest(ContainerId containerId, } } + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest requests) throws YarnException, IOException { + if (blockNewContainerRequests.get()) { + throw new NMNotYetReadyException( + "Rejecting new containers as NodeManager has not" + + " yet connected with ResourceManager"); + } + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi); + authorizeUser(remoteUgi, nmTokenIdentifier); + List succeededContainers = new ArrayList(); + Map failedContainers = + new HashMap(); + // Synchronize with NodeStatusUpdaterImpl#registerWithRM + // to avoid race condition during NM-RM resync (due to RM restart) while a + // container is being started, in particular when the container has not yet + // been added to the containers map in NMContext. + synchronized (this.context) { + for (InitializeContainerRequest request : requests + .getInitializeContainerRequests()) { + ContainerId containerId = null; + try { + if (request.getContainerToken() == null + || request.getContainerToken().getIdentifier() == null) { + throw new IOException(INVALID_CONTAINERTOKEN_MSG); + } + + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier(request.getContainerToken()); + verifyAndGetContainerTokenIdentifier(request.getContainerToken(), + containerTokenIdentifier); + containerId = containerTokenIdentifier.getContainerID(); + + boolean isReinit = false; + Container previousContainer = this.context.getContainers().get(containerId); + if(previousContainer != null) { + isReinit = true; + } + + performContainerPreStartChecks(nmTokenIdentifier, + request.getContainerLaunchContext(), containerTokenIdentifier, + isReinit); + + if(isReinit) { + reinitializeContainerInternal(nmTokenIdentifier, + containerTokenIdentifier, request, previousContainer); + } else { + initializeContainerInternal(nmTokenIdentifier, + containerTokenIdentifier, request); + } + + succeededContainers.add(containerId); + } catch (YarnException e) { + failedContainers.put(containerId, SerializedException.newInstance(e)); + } catch (InvalidToken ie) { + failedContainers + .put(containerId, SerializedException.newInstance(ie)); + throw ie; + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + } + return InitializeContainersResponse + .newInstance(getAuxServiceMetaData(), succeededContainers, + failedContainers); + } + } + class ContainerEventDispatcher implements EventHandler { @Override public void handle(ContainerEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java index ef9e5a4..b874c90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerInitEvent.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; @@ -34,14 +35,36 @@ */ public class ApplicationContainerInitEvent extends ApplicationEvent { final Container container; + final boolean startAfterInit; + private final ContainerLaunchContext updatedCtxt; - public ApplicationContainerInitEvent(Container container) { + public ApplicationContainerInitEvent(Container container, + boolean startAfterInit, ContainerLaunchContext updatedCtxt) { super(container.getContainerId().getApplicationAttemptId() .getApplicationId(), ApplicationEventType.INIT_CONTAINER); this.container = container; + this.startAfterInit = startAfterInit; + this.updatedCtxt = updatedCtxt; + } + + public ApplicationContainerInitEvent(Container container, + boolean startAfterInit) { + super(container.getContainerId().getApplicationAttemptId() + .getApplicationId(), ApplicationEventType.INIT_CONTAINER); + this.container = container; + this.startAfterInit = startAfterInit; + this.updatedCtxt = null; } Container getContainer() { return container; } + + public boolean shouldStartAfterInit() { + return this.startAfterInit; + } + + public ContainerLaunchContext getUpdatedContainerLaunchContext() { + return this.updatedCtxt; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java index b4ba76a..04f30b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java @@ -23,6 +23,7 @@ // Source: ContainerManager INIT_APPLICATION, INIT_CONTAINER, + START_CONTAINER, FINISH_APPLICATION, // Source: LogAggregationService if init fails // Source: ResourceLocalizationService diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index fbc8453..4c5aa19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; +import java.util.Queue; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -41,6 +43,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerStartEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -78,6 +81,9 @@ Map containers = new HashMap(); + + Queue pendingContainerInitEvents = + new LinkedList(); public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId, Credentials credentials, Context context) { @@ -167,6 +173,10 @@ public ApplicationState getApplicationState() { new InitContainerTransition()) .addTransition(ApplicationState.RUNNING, ApplicationState.RUNNING, + ApplicationEventType.START_CONTAINER, + new StartContainerTransition()) + .addTransition(ApplicationState.RUNNING, + ApplicationState.RUNNING, ApplicationEventType.APPLICATION_CONTAINER_FINISHED, CONTAINER_DONE_TRANSITION) .addTransition( @@ -302,15 +312,18 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.containers.put(container.getContainerId(), container); LOG.info("Adding " + container.getContainerId() + " to application " + app.toString()); - + + ContainerInitEvent containerInitEvent = new ContainerInitEvent( + container.getContainerId(), initEvent.shouldStartAfterInit()); + switch (app.getApplicationState()) { case RUNNING: - app.dispatcher.getEventHandler().handle(new ContainerInitEvent( - container.getContainerId())); + app.dispatcher.getEventHandler().handle(containerInitEvent); break; case INITING: case NEW: // these get queued up and sent out in AppInitDoneTransition + app.pendingContainerInitEvents.add(containerInitEvent); break; default: assert false : "Invalid state for InitContainerTransition: " + @@ -325,9 +338,9 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { @Override public void transition(ApplicationImpl app, ApplicationEvent event) { // Start all the containers waiting for ApplicationInit - for (Container container : app.containers.values()) { - app.dispatcher.getEventHandler().handle(new ContainerInitEvent( - container.getContainerId())); + while (!app.pendingContainerInitEvents.isEmpty()) { + app.dispatcher.getEventHandler() + .handle(app.pendingContainerInitEvents.remove()); } } } @@ -442,6 +455,20 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { } } + @SuppressWarnings("unchecked") + static class StartContainerTransition implements + SingleArcTransition { + @Override + public void transition(ApplicationImpl app, ApplicationEvent event) { + ApplicationContainerStartEvent startEvent = + (ApplicationContainerStartEvent) event; + Container container = startEvent.getContainer(); + + app.dispatcher.getEventHandler().handle(new ContainerStartEvent( + container.getContainerId())); + } + } + @Override public void handle(ApplicationEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 7571964..c3ea338 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -67,6 +67,16 @@ void setLogDir(String logDir); + boolean shouldStartAfterInit(); + + void setStartAfterInit(boolean startAfterInit); + + long getDestroyDelay(); + + boolean allowsRestart(); + + boolean isAwaitingStart(); + String toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 5622f8c..815e4df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -22,6 +22,8 @@ // Producer: ContainerManager INIT_CONTAINER, + START_CONTAINER, + STOP_CONTAINER, KILL_CONTAINER, UPDATE_DIAGNOSTICS_MSG, CONTAINER_DONE, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index b1ddc2e..4c2677f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -38,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -58,11 +60,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.CleanupContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent; @@ -113,6 +117,9 @@ private int remainingRetryAttempts; private String workDir; private String logDir; + // the time to wait before destroying the container after completion + private long destroyDelay; + private AtomicBoolean startAfterInit = new AtomicBoolean(false); /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -143,7 +150,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, NodeManagerMetrics metrics, - ContainerTokenIdentifier containerTokenIdentifier, Context context) { + ContainerTokenIdentifier containerTokenIdentifier, Context context, + long destroyDelay) { this.daemonConf = conf; this.dispatcher = dispatcher; this.stateStore = context.getNMStateStore(); @@ -158,6 +166,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.diagnosticsMaxSize = conf.getInt( YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE, YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE); + this.destroyDelay = destroyDelay; this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); this.resource = containerTokenIdentifier.getResource(); @@ -188,6 +197,14 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, stateMachine = stateMachineFactory.make(this); } + public ContainerImpl(Configuration conf, Dispatcher dispatcher, + ContainerLaunchContext launchContext, Credentials creds, + NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, Context context) { + this(conf, dispatcher, launchContext, creds, metrics, + containerTokenIdentifier, context, 0L); + } + // constructor for a recovered container public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -210,6 +227,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); this.workDir = rcs.getWorkDir(); this.logDir = rcs.getLogDir(); + this.destroyDelay = rcs.getDestroyDelay(); } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -274,20 +292,37 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, // From LOCALIZED State .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING, ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) - .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE, - ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition(true)) .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.LOCALIZED, ContainerState.LAUNCHING, + ContainerEventType.START_CONTAINER, + new StartContainerTransition()) + .addTransition(ContainerState.LOCALIZED, + EnumSet.of(ContainerState.LOCALIZING, + ContainerState.LOCALIZED), ContainerEventType.INIT_CONTAINER, + new RequestResourcesTransition()) + // From LAUNCHING State + .addTransition(ContainerState.LAUNCHING, ContainerState.LAUNCHING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.LAUNCHING, ContainerState.RUNNING, + ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) + .addTransition(ContainerState.LAUNCHING, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.LAUNCHING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + // From RUNNING State .addTransition(ContainerState.RUNNING, - ContainerState.EXITED_WITH_SUCCESS, + EnumSet.of(ContainerState.EXITED_WITH_SUCCESS, + ContainerState.LOCALIZED), ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, - new ExitedWithSuccessTransition(true)) + new ExitedWithSuccessTransition()) .addTransition(ContainerState.RUNNING, EnumSet.of(ContainerState.RELAUNCHING, ContainerState.EXITED_WITH_FAILURE), @@ -301,7 +336,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) - + .addTransition(ContainerState.RUNNING, ContainerState.STOPPING, + ContainerEventType.STOP_CONTAINER, new StopTransition()) + // From RELAUNCHING State .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING, ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) @@ -315,6 +352,20 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + // From STOPPING State + .addTransition(ContainerState.STOPPING, ContainerState.STOPPING, + ContainerEventType.STOP_CONTAINER, new StopTransition()) + .addTransition(ContainerState.STOPPING, ContainerState.STOPPING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.STOPPING, ContainerState.LOCALIZED, + EnumSet.of(ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE), + new ContainerStoppedTransition()) + .addTransition(ContainerState.STOPPING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + // From CONTAINER_EXITED_WITH_SUCCESS State .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, @@ -358,7 +409,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.KILL_CONTAINER) .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, - new ExitedWithSuccessTransition(false)) + new ExitedWithSuccessOnKillingTransition()) .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition(false)) @@ -412,13 +463,18 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, stateMachine; public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { + if(this.isAwaitingStart()) { + return org.apache.hadoop.yarn.api.records.ContainerState.AWAITING_START; + } switch (stateMachine.getCurrentState()) { case NEW: case LOCALIZING: case LOCALIZATION_FAILED: case LOCALIZED: + case LAUNCHING: case RUNNING: case RELAUNCHING: + case STOPPING: case EXITED_WITH_SUCCESS: case EXITED_WITH_FAILURE: case KILLING: @@ -446,6 +502,7 @@ public String getUser() { this.readLock.lock(); try { if (ContainerState.LOCALIZED == getContainerState() + || ContainerState.LAUNCHING == getContainerState() || ContainerState.RELAUNCHING == getContainerState()) { return localizedResources; } else { @@ -559,6 +616,37 @@ public void setLogDir(String logDir) { this.logDir = logDir; } + @Override + public boolean shouldStartAfterInit() { + return this.startAfterInit.get(); + } + + @Override + public void setStartAfterInit(boolean startAfterInit) { + this.startAfterInit.set(startAfterInit); + } + + @Override + public long getDestroyDelay() { + return destroyDelay; + } + + @Override + public boolean allowsRestart() { + return destroyDelay > 0; + } + + @Override + public boolean isAwaitingStart() { + this.readLock.lock(); + try { + return this.getContainerState() == ContainerState.LOCALIZED + && !this.startAfterInit.get(); + } finally { + this.readLock.unlock(); + } + } + @SuppressWarnings("unchecked") private void sendFinishedEvents() { // Inform the application @@ -579,6 +667,11 @@ private void sendLaunchEvent() { if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { // try to recover a container that was previously launched launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + // for container that allows restart we should + // avoid trying to recover again + if(allowsRestart()) { + recoveredStatus = RecoveredContainerStatus.REQUESTED; + } } containerLaunchStartTime = clock.getTime(); dispatcher.getEventHandler().handle( @@ -586,6 +679,12 @@ private void sendLaunchEvent() { } @SuppressWarnings("unchecked") // dispatcher not typed + private void sendStartEvent() { + dispatcher.getEventHandler() + .handle(new ContainerStartEvent(this.getContainerId())); + } + + @SuppressWarnings("unchecked") // dispatcher not typed private void sendRelaunchEvent() { ContainersLauncherEventType launcherEvent = ContainersLauncherEventType.RELAUNCH_CONTAINER; @@ -647,6 +746,39 @@ public void cleanup() { new ContainerLocalizationCleanupEvent(this, rsrc)); } + @SuppressWarnings("unchecked") // dispatcher not typed + public void releaseLocalResources() { + Map> rsrc = + new HashMap>(); + if (!publicRsrcs.isEmpty()) { + Collection releasePublicRsrcs = + new ArrayList(); + releasePublicRsrcs.addAll(publicRsrcs); + publicRsrcs.clear(); + rsrc.put(LocalResourceVisibility.PUBLIC, releasePublicRsrcs); + } + if (!privateRsrcs.isEmpty()) { + Collection releasePrivateRsrcs = + new ArrayList(); + releasePrivateRsrcs.addAll(privateRsrcs); + privateRsrcs.clear(); + rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs); + } + if (!appRsrcs.isEmpty()) { + Collection releaseAppRsrcs = + new ArrayList(); + releaseAppRsrcs.addAll(appRsrcs); + appRsrcs.clear(); + rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs); + } + dispatcher.getEventHandler().handle( + new ContainerLocalizationReleaseEvent(this, rsrc)); + localizedResources.clear(); + // TODO: should we also clear resourcesToBeUploaded and + // resourcesUploadPolicies? + } + static class ContainerTransition implements SingleArcTransition { @@ -695,7 +827,43 @@ public ContainerState transition(ContainerImpl container, final ContainerLaunchContext ctxt = container.launchContext; container.metrics.initingContainer(); + + ContainerInitEvent initEvent = (ContainerInitEvent) event; + container.setStartAfterInit(initEvent.shouldStartAfterInit()); + final StartContainerRequest updatedStartRequest = + initEvent.getUpdatedStartRequest(); + + if(updatedStartRequest != null) { + // application master has requested a container reinitialization + // we must free current local resources and update the container launch + // context + + final ContainerLaunchContext updatedCtxt = updatedStartRequest.getContainerLaunchContext(); + + StartContainerRequest storestartRequest = StartContainerRequest + .newInstance(ctxt, updatedStartRequest.getContainerToken()); + try { + container.context.getNMStateStore() + .storeContainer(container.getContainerId(), storestartRequest); + } catch (IOException e) { + // TODO: is it ok to just report a warning, or should we do a no-op + // instead of the reinitialization? + LOG.warn( + "Unable to update container launch context in state store for " + + container.getContainerId(), e); + } + + ctxt.setCommands(updatedCtxt.getCommands()); + ctxt.setLocalResources(updatedCtxt.getLocalResources()); + ctxt.setEnvironment(updatedCtxt.getEnvironment()); + ctxt.setServiceData(updatedCtxt.getServiceData()); + // TODO: do we want to update other container launch context fields + // after reinitialization? + + container.releaseLocalResources(); + } + container.dispatcher.getEventHandler().handle(new AuxServicesEvent (AuxServicesEventType.CONTAINER_INIT, container)); @@ -771,7 +939,9 @@ public ContainerState transition(ContainerImpl container, new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { - container.sendLaunchEvent(); + if(initEvent.shouldStartAfterInit()) { + container.sendStartEvent(); + } container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -835,7 +1005,9 @@ public ContainerState transition(ContainerImpl container, new ContainerLocalizationEvent(LocalizationEventType. CONTAINER_RESOURCES_LOCALIZED, container)); - container.sendLaunchEvent(); + if(container.startAfterInit.get()) { + container.sendStartEvent(); + } container.metrics.endInitingContainer(); // If this is a recovered container that has already launched, skip @@ -859,8 +1031,9 @@ public ContainerState transition(ContainerImpl container, } /** - * Transition from LOCALIZED state to RUNNING state upon receiving - * a CONTAINER_LAUNCHED event + * Transitions upon receiving CONTAINER_LAUNCHED: + * - LOCALIZED -> LAUNCHING + * - LOCALIZED -> RUNNING (used for container recovery) */ static class LaunchTransition extends ContainerTransition { @SuppressWarnings("unchecked") @@ -869,44 +1042,53 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.sendContainerMonitorStartEvent(); container.metrics.runningContainer(); container.wasLaunched = true; + container.setStartAfterInit(false); if (container.recoveredAsKilled) { LOG.info("Killing " + container.containerId + " due to recovered as killed"); container.addDiagnostics("Container recovered as killed.\n"); container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); + new CleanupContainersLauncherEvent(container, true)); } } } /** - * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state - * upon EXITED_WITH_SUCCESS message. + * Transition from RUNNING state to EXITED_WITH_SUCCESS or + * LOCALIZED state upon EXITED_WITH_SUCCESS message. */ @SuppressWarnings("unchecked") // dispatcher not typed - static class ExitedWithSuccessTransition extends ContainerTransition { + static class ExitedWithSuccessTransition implements + MultipleArcTransition { - boolean clCleanupRequired; + @Override + public ContainerState transition(ContainerImpl container, ContainerEvent event) { + // Set exit code to 0 on success + container.exitCode = 0; - public ExitedWithSuccessTransition(boolean clCleanupRequired) { - this.clCleanupRequired = clCleanupRequired; + if(container.allowsRestart()) { + return ContainerState.LOCALIZED; + } + + container.dispatcher.getEventHandler().handle( + new CleanupContainersLauncherEvent(container, true)); + container.cleanup(); + + return ContainerState.EXITED_WITH_SUCCESS; } + } + + /** + * Transition from KILLING state to EXITED_WITH_SUCCESS state + * upon EXITED_WITH_SUCCESS message. + */ + static class ExitedWithSuccessOnKillingTransition extends ContainerTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { - // Set exit code to 0 on success + // Set exit code to 0 on success container.exitCode = 0; - - // TODO: Add containerWorkDir to the deletion service. - - if (clCleanupRequired) { - container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); - } - container.cleanup(); } } @@ -932,13 +1114,9 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } - // TODO: Add containerWorkDir to the deletion service. - // TODO: Add containerOuputDir to the deletion service. - if (clCleanupRequired) { container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); + new CleanupContainersLauncherEvent(container, true)); } container.cleanup(); @@ -1013,6 +1191,20 @@ public void run() { } } + /** + * Transition from LOCALIZED to LAUNCHING + * upon receiving START_CONTAINER event. + */ + static class StartContainerTransition implements + SingleArcTransition { + + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // TODO: if allows restart, stop monitoring for container destroy + container.sendLaunchEvent(); + } + } + @Override public boolean isRetryContextSet() { return containerRetryContext.getRetryPolicy() @@ -1127,8 +1319,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) { // Kill the process/process-grp container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); + new CleanupContainersLauncherEvent(container, true)); ContainerKillEvent killEvent = (ContainerKillEvent) event; container.addDiagnostics(killEvent.getDiagnostic(), "\n"); container.exitCode = killEvent.getContainerExitStatus(); @@ -1136,6 +1327,22 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** + * Transitions from RUNNING to STOPPING upon receiving STOP_CONTAINER + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class StopTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + LOG.info("Stopping Container " + container.getContainerId() + "."); + container.dispatcher.getEventHandler().handle( + new CleanupContainersLauncherEvent(container, false)); + container.addDiagnostics("Container is being stopped by application master.", + "\n"); + } + } + + /** * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL * upon receiving CONTAINER_KILLED_ON_REQUEST. */ @@ -1159,6 +1366,29 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** + * Transition from STOPPING to LOCALIZED upon receiving + * CONTAINER_KILLED_ON_REQUEST or CONTAINER_EXITED_WITH_SUCCESS. + */ + static class ContainerStoppedTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + ContainerExitEvent exitEvent = (ContainerExitEvent) event; + + // should use an explicit exit code for stopping? + container.exitCode = ContainerExitStatus.KILLED_BY_APPMASTER; + if (exitEvent.getDiagnosticInfo() != null) { + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); + } + + container.wasLaunched = false; + container.metrics.endRunningContainer(); + + // TODO: if allows restart, start monitoring container for destroy delay + } + } + + /** * Handle the following transitions: * - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, * KILLING, CONTAINER_CLEANEDUP_AFTER_KILL} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java index 56421d9..d1e2e9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java @@ -17,12 +17,31 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerId; public class ContainerInitEvent extends ContainerEvent { + private final boolean startAfterInit; + private final StartContainerRequest updatedStartRequest; - public ContainerInitEvent(ContainerId c) { + public ContainerInitEvent(ContainerId c, boolean startAfterInit, + StartContainerRequest updatedStartRequest) { super(c, ContainerEventType.INIT_CONTAINER); + this.startAfterInit = startAfterInit; + this.updatedStartRequest = updatedStartRequest; } + public ContainerInitEvent(ContainerId c, boolean startAfterInit) { + super(c, ContainerEventType.INIT_CONTAINER); + this.startAfterInit = startAfterInit; + this.updatedStartRequest = null; + } + + public boolean shouldStartAfterInit() { + return this.startAfterInit; + } + + public StartContainerRequest getUpdatedStartRequest() { + return this.updatedStartRequest; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 6b96204..5dbca1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; public enum ContainerState { - NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING, - EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, + NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, LAUNCHING, RUNNING, + RELAUNCHING, STOPPING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index a3b53e3..9d70393 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -178,9 +178,14 @@ public Integer call() { String appIdStr = app.getAppId().toString(); String relativeContainerLogDir = ContainerLaunch .getRelativeContainerLogDir(appIdStr, containerIdStr); - containerLogDir = - dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); - recordContainerLogDir(containerID, containerLogDir.toString()); + String recordedContainerLogDir = container.getLogDir(); + if(recordedContainerLogDir != null) { + containerLogDir = new Path(recordedContainerLogDir); + } else { + containerLogDir = + dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); + recordContainerLogDir(containerID, containerLogDir.toString()); + } for (String str : command) { // TODO: Should we instead work via symlinks without this grammar? newCmds.add(expandEnvironment(str, containerLogDir)); @@ -215,14 +220,26 @@ public Integer call() { DataOutputStream tokensOutStream = null; // Select the working directory for the container - Path containerWorkDir = - dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE - + Path.SEPARATOR + user + Path.SEPARATOR - + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr - + Path.SEPARATOR + containerIdStr, - LocalDirAllocator.SIZE_UNKNOWN, false); - recordContainerWorkDir(containerID, containerWorkDir.toString()); - + String recordedWorkDir = container.getWorkDir(); + Path containerWorkDir; + if(recordedWorkDir != null) { + containerWorkDir = new Path(recordedWorkDir); + // remove content of working directory for a clean start + // since data from a previous run might still be present + LOG.info("Removing working directory: " + recordedWorkDir + + " for container: " + containerIdStr); + lfs.delete(containerWorkDir, true); + + // TODO: let the application master specify a policy for data removal + } else { + containerWorkDir = + dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE + + Path.SEPARATOR + user + Path.SEPARATOR + + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr + + Path.SEPARATOR + containerIdStr, + LocalDirAllocator.SIZE_UNKNOWN, false); + recordContainerWorkDir(containerID, containerWorkDir.toString()); + } String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not @@ -391,7 +408,8 @@ protected void setContainerCompletedStatus(int exitCode) { completed.set(true); exec.deactivateContainer(containerId); try { - if (!container.shouldRetry(exitCode)) { + if (!container.shouldRetry(exitCode) && !(container.allowsRestart() + && container.getContainerState() == ContainerState.STOPPING)) { context.getNMStateStore().storeContainerCompleted(containerId, exitCode); } @@ -518,16 +536,18 @@ protected String getPidFileSubpath(String appIdStr, String containerIdStr) { * @throws IOException */ @SuppressWarnings("unchecked") // dispatcher not typed - public void cleanupContainer() throws IOException { + public void cleanupContainer(boolean storeAsCompleted) throws IOException { ContainerId containerId = container.getContainerId(); String containerIdStr = ConverterUtils.toString(containerId); LOG.info("Cleaning up container " + containerIdStr); - try { - context.getNMStateStore().storeContainerKilled(containerId); - } catch (IOException e) { - LOG.error("Unable to mark container " + containerId - + " killed in store", e); + if(storeAsCompleted) { + try { + context.getNMStateStore().storeContainerKilled(containerId); + } catch (IOException e) { + LOG.error("Unable to mark container " + containerId + + " killed in store", e); + } } // launch flag will be set to true if process already launched @@ -1163,7 +1183,7 @@ public static String getExitCodeFile(String pidFile) { private void recordContainerLogDir(ContainerId containerId, String logDir) throws IOException{ - if (container.isRetryContextSet()) { + if (container.isRetryContextSet() || container.allowsRestart()) { container.setLogDir(logDir); context.getNMStateStore().storeContainerLogDir(containerId, logDir); } @@ -1171,7 +1191,7 @@ private void recordContainerLogDir(ContainerId containerId, private void recordContainerWorkDir(ContainerId containerId, String workDir) throws IOException{ - if (container.isRetryContextSet()) { + if (container.isRetryContextSet() || container.allowsRestart()) { container.setWorkDir(workDir); context.getNMStateStore().storeContainerWorkDir(containerId, workDir); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index e5fff00..118e64f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -137,6 +137,8 @@ public void handle(ContainersLauncherEvent event) { running.put(containerId, launch); break; case CLEANUP_CONTAINER: + CleanupContainersLauncherEvent cleanupEvent = + (CleanupContainersLauncherEvent) event; ContainerLaunch launcher = running.remove(containerId); if (launcher == null) { // Container not launched. So nothing needs to be done. @@ -146,7 +148,7 @@ public void handle(ContainersLauncherEvent event) { // Cleanup a container whether it is running/killed/completed, so that // no sub-processes are alive. try { - launcher.cleanupContainer(); + launcher.cleanupContainer(cleanupEvent.getStoreAsCompleted()); } catch (IOException e) { LOG.warn("Got exception while cleaning container " + containerId + ". Ignoring."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java index 66f5a2a..3848e53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java @@ -71,6 +71,20 @@ public Integer call() { containerId.getApplicationAttemptId().getApplicationId()); String containerIdStr = ConverterUtils.toString(containerId); + if (container.allowsRestart() + && locatePidFile(appIdStr, containerIdStr) == null) { + // either the container was not started or was waiting + // for start, in both cases we leave the container in localized state + // waiting for requests form the application master + container.setStartAfterInit(false); + + // TODO: start monitoring container for destroy delay + + return 0; + } + + // TODO: recover stopping state for containers that allows restart + dispatcher.getEventHandler().handle(new ContainerEvent(containerId, ContainerEventType.CONTAINER_LAUNCHED)); @@ -97,14 +111,7 @@ public Integer call() { notInterrupted = false; } finally { if (notInterrupted) { - this.completed.set(true); - exec.deactivateContainer(containerId); - try { - getContext().getNMStateStore().storeContainerCompleted(containerId, - retCode); - } catch (IOException e) { - LOG.error("Unable to set exit code for container " + containerId); - } + setContainerCompletedStatus(retCode); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index b2413ad..ae98e82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -423,6 +424,9 @@ public void handle(LocalizationEvent event) { case CLEANUP_CONTAINER_RESOURCES: handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event); break; + case RELEASE_CONTAINER_RESOURCES: + handleReleaseContainerResources((ContainerLocalizationReleaseEvent)event); + break; case DESTROY_APPLICATION_RESOURCES: handleDestroyApplicationResources( ((ApplicationLocalizationEvent)event).getApplication()); @@ -516,21 +520,9 @@ private void handleCacheCleanup(LocalizationEvent event) { private void handleCleanupContainerResources( ContainerLocalizationCleanupEvent rsrcCleanup) { Container c = rsrcCleanup.getContainer(); - Map> rsrcs = - rsrcCleanup.getResources(); - for (Map.Entry> e : - rsrcs.entrySet()) { - LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), - c.getContainerId().getApplicationAttemptId() - .getApplicationId()); - for (LocalResourceRequest req : e.getValue()) { - tracker.handle(new ResourceReleaseEvent(req, - c.getContainerId())); - } - } - String locId = ConverterUtils.toString(c.getContainerId()); - localizerTracker.cleanupPrivLocalizers(locId); - + handleReleaseContainerResources( + new ContainerLocalizationReleaseEvent(c, rsrcCleanup.getResources())); + // Delete the container directories String userName = c.getUser(); String containerIDStr = c.toString(); @@ -563,7 +555,26 @@ private void handleCleanupContainerResources( new ContainerEvent(c.getContainerId(), ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); } - + + private void handleReleaseContainerResources( + ContainerLocalizationReleaseEvent rsrcRelease) { + Container c = rsrcRelease.getContainer(); + Map> rsrcs = + rsrcRelease.getResources(); + for (Map.Entry> e : + rsrcs.entrySet()) { + LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), + c.getContainerId().getApplicationAttemptId() + .getApplicationId()); + for (LocalResourceRequest req : e.getValue()) { + tracker.handle(new ResourceReleaseEvent(req, + c.getContainerId())); + } + } + String locId = ConverterUtils.toString(c.getContainerId()); + localizerTracker.cleanupPrivLocalizers(locId); + } + private void submitDirForDeletion(String userName, Path dir) { try { lfs.getFileStatus(dir); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java index 4785fba..06bdc8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java @@ -22,6 +22,7 @@ INIT_CONTAINER_RESOURCES, CACHE_CLEANUP, CLEANUP_CONTAINER_RESOURCES, + RELEASE_CONTAINER_RESOURCES, DESTROY_APPLICATION_RESOURCES, CONTAINER_RESOURCES_LOCALIZED, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java index d59abda..f21d2b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -264,7 +264,7 @@ public void recordCpuUsage( } public void recordProcessId(String processId) { - registry.tag(PROCESSID_INFO, processId); + registry.tag(PROCESSID_INFO, processId, true); } public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6e9efe1..002325f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -114,6 +114,7 @@ "/remainingRetryAttempts"; private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir"; private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir"; + private static final String CONTAINER_DESTROY_DELAY_SUFFIX = "/destroydelay"; private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; @@ -258,6 +259,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, rcs.setWorkDir(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { rcs.setLogDir(asString(entry.getValue())); + } else if (suffix.equals(CONTAINER_DESTROY_DELAY_SUFFIX)) { + rcs.setDestroyDelay(Integer.parseInt(asString(entry.getValue()))); } else { throw new IOException("Unexpected container state key: " + key); } @@ -404,6 +407,18 @@ public void storeContainerLogDir(ContainerId containerId, } @Override + public void storeContainerDestroyDelay(ContainerId containerId, + long destroyDelay) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_DESTROY_DELAY_SUFFIX; + try { + db.put(bytes(key), bytes(Long.toString(destroyDelay))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 08b80e9..b08a531 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -115,6 +115,11 @@ public void storeContainerLogDir(ContainerId containerId, } @Override + public void storeContainerDestroyDelay(ContainerId containerId, + long destroyDelay) throws IOException { + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index ccf1e70..b71537d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -76,6 +76,7 @@ public NMStateStoreService(String name) { private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; private String workDir; private String logDir; + private long destroyDelay; public RecoveredContainerStatus getStatus() { return status; @@ -125,6 +126,14 @@ public void setLogDir(String logDir) { this.logDir = logDir; } + public long getDestroyDelay() { + return destroyDelay; + } + + public void setDestroyDelay(long destroyDelay) { + this.destroyDelay = destroyDelay; + } + @Override public String toString() { return new StringBuffer("Status: ").append(getStatus()) @@ -136,6 +145,7 @@ public String toString() { .append(", RemainingRetryAttempts: ").append(remainingRetryAttempts) .append(", WorkDir: ").append(workDir) .append(", LogDir: ").append(logDir) + .append(", DestroyDelay: ").append(destroyDelay) .toString(); } } @@ -310,7 +320,7 @@ public abstract void removeApplication(ApplicationId appId) */ public abstract void storeContainer(ContainerId containerId, StartContainerRequest startRequest) throws IOException; - + /** * Record that a container has been launched * @param containerId the container ID @@ -383,6 +393,15 @@ public abstract void storeContainerLogDir( ContainerId containerId, String logDir) throws IOException; /** + * Record destroy delay for a container. + * @param containerId the container ID + * @param destroyDelay the destroy delay + * @throws IOException + */ + public abstract void storeContainerDestroyDelay( + ContainerId containerId, long destroyDelay) throws IOException; + + /** * Remove records corresponding to a container * @param containerId the container ID * @throws IOException diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java index 319f49b..b1801a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java @@ -65,7 +65,7 @@ // aggregation to false, container log view request is forwarded to NM. NM // does not have completed container information,but still NM serve request for // reading container logs. - if (container != null) { + if (container != null && container.getLogDir() == null) { checkState(container.getContainerState()); } @@ -95,7 +95,7 @@ public static File getContainerLogFile(ContainerId containerId, Application application = getApplicationForContainer(containerId, context); checkAccess(remoteUser, application, context); - if (container != null) { + if (container != null && container.getLogDir() == null) { checkState(container.getContainerState()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 3ff04d8..38325bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -102,7 +102,6 @@ public void handle(LocalizationEvent event) { case CLEANUP_CONTAINER_RESOURCES: Container container = ((ContainerLocalizationEvent) event).getContainer(); - // TODO: delete the container dir this.dispatcher.getEventHandler().handle( new ContainerEvent(container.getContainerId(), ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); @@ -191,14 +190,6 @@ public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { } @Override - protected void authorizeStartAndResourceIncreaseRequest( - NMTokenIdentifier nmTokenIdentifier, - ContainerTokenIdentifier containerTokenIdentifier, - boolean startRequest) throws YarnException { - // do nothing - } - - @Override protected void authorizeGetAndStopContainerRequest(ContainerId containerId, Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException { // do nothing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 370a207..c8c5b56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -567,10 +567,12 @@ public void initApplication() { public void initContainer(int containerNum) { if (containerNum == -1) { for (int i = 0; i < containers.size(); i++) { - app.handle(new ApplicationContainerInitEvent(containers.get(i))); + app.handle( + new ApplicationContainerInitEvent(containers.get(i), true)); } } else { - app.handle(new ApplicationContainerInitEvent(containers.get(containerNum))); + app.handle(new ApplicationContainerInitEvent( + containers.get(containerNum), true)); } drainDispatcherEvents(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 4652245..7e2e87d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -118,6 +119,7 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); rcsCopy.setWorkDir(rcs.getWorkDir()); rcsCopy.setLogDir(rcs.getLogDir()); + rcsCopy.setDestroyDelay(rcs.getDestroyDelay()); result.add(rcsCopy); } return result; @@ -192,6 +194,13 @@ public void storeContainerLogDir(ContainerId containerId, } @Override + public void storeContainerDestroyDelay(ContainerId containerId, + long destroyDelay) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setDestroyDelay(destroyDelay); + } + + @Override public synchronized void removeContainer(ContainerId containerId) throws IOException { containerStates.remove(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index ccc9254..cc673fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -346,6 +346,14 @@ public void testContainerStorage() throws IOException { assertEquals("/test/workdir", rcs.getWorkDir()); assertEquals("/test/logdir", rcs.getLogDir()); + // store destroy delay + stateStore.storeContainerDestroyDelay(containerId, 32000); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(32000, rcs.getDestroyDelay()); + // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 0b95dba..214ca92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -172,4 +172,28 @@ public String getLogDir() { @Override public void setLogDir(String logDir) { } + + @Override + public boolean shouldStartAfterInit() { + return false; + } + + @Override + public void setStartAfterInit(boolean startAfterInit) { + } + + @Override + public long getDestroyDelay() { + return 0; + } + + @Override + public boolean allowsRestart() { + return false; + } + + @Override + public boolean isAwaitingStart() { + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index b4ebf92..7b0e872 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -27,12 +27,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -304,6 +308,18 @@ public IncreaseContainersResourceResponse increaseContainersResource( return null; } + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + return null; + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + return null; + } + public static org.apache.hadoop.yarn.server.api.records.NodeStatus createNodeStatus(NodeId nodeId, List containers) { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index 2787f1e..92d29dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -42,6 +42,10 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -130,6 +134,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(IncreaseCon return IncreaseContainersResourceResponse.newInstance(null, null); } + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + return InitializeContainersResponse.newInstance(null, null, null); + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + return DestroyContainersResponse.newInstance(null, null); + } + public Credentials getContainerCredentials() throws IOException { Credentials credentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 8cdb191..e3d9da3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -33,8 +33,12 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.DestroyContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.InitializeContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -145,6 +149,18 @@ public IncreaseContainersResourceResponse increaseContainersResource( throws YarnException { return null; } + + @Override + public InitializeContainersResponse initializeContainers( + InitializeContainersRequest request) throws YarnException, IOException { + return null; + } + + @Override + public DestroyContainersResponse destroyContainers( + DestroyContainersRequest request) throws YarnException, IOException { + return null; + } } @Test