diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java new file mode 100644 index 0000000..b0d023e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java @@ -0,0 +1,73 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Records; + +/** + *

The request sent by the ApplicationMaster to the + * NodeManager to stop containers.

+ * + * @see ContainerManagementProtocol#stopContainers(StopContainersRequest) + */ +@Public +@Unstable +public abstract class StopContainerRequest { + + @Public + @Unstable + public static StopContainerRequest newInstance(ContainerId containerId, + boolean dumpThreads) { + StopContainerRequest request = + Records.newRecord(StopContainerRequest.class); + request.setContainerId(containerId); + request.setDumpThreads(dumpThreads); + return request; + } + + /** + * Get the ContainerId of the container to be stopped. + * @return ContainerId of container to be stopped + */ + @Public + @Unstable + public abstract ContainerId getContainerId(); + + /** + * Set the ContainerIds of the container to be stopped. + * @param containerId ContainerId of the container to be stopped + */ + @Public + @Unstable + public abstract void setContainerId(ContainerId containerId); + + public abstract boolean getDumpThreads(); + + /** + * Set the DumpThreadss of the container to be stopped. + * @param dumpThreads DumpThreads of the container to be stopped + */ + @Public + @Unstable + public abstract void setDumpThreads(boolean dumpThreads); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java index 8ea186c..7350696 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.Records; @@ -45,6 +46,16 @@ public static StopContainersRequest newInstance(List containerIds) return request; } + @Public + @Unstable + public static StopContainersRequest newInstance1( + List stopRequests) { + StopContainersRequest request = + Records.newRecord(StopContainersRequest.class); + request.setStopRequests(stopRequests); + return request; + } + /** * Get the ContainerIds of the containers to be stopped. * @return ContainerIds of containers to be stopped @@ -60,4 +71,18 @@ public static StopContainersRequest newInstance(List containerIds) @Public @Stable public abstract void setContainerIds(List containerIds); + + /** + * @return StopRequests + */ + @Public + @Unstable + public abstract List getStopRequests(); + + /** + * @param stopRequests StopRequests. + */ + @Public + @Stable + public abstract void setStopRequests(List stopRequests); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index eff5cd7..06c31be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -202,6 +202,7 @@ message StartContainerResponseProto { message StopContainerRequestProto { optional ContainerIdProto container_id = 1; + optional bool dump_threads = 2; } message StopContainerResponseProto { @@ -233,6 +234,7 @@ message StartContainersResponseProto { message StopContainersRequestProto { repeated ContainerIdProto container_id = 1; + repeated StopContainerRequestProto stop_requests = 2; } message StopContainersResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java new file mode 100644 index 0000000..41a8d05 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProtoOrBuilder; + + + +public class StopContainerRequestPBImpl extends StopContainerRequest { + StopContainerRequestProto proto = StopContainerRequestProto.getDefaultInstance(); + StopContainerRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId = null; + private boolean dumpThreads; + + + public StopContainerRequestPBImpl() { + builder = StopContainerRequestProto.newBuilder(); + } + + public StopContainerRequestPBImpl(StopContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public StopContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + + if (this.dumpThreads) { + builder.setDumpThreads(this.dumpThreads); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = StopContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + + @Override + public ContainerId getContainerId() { + StopContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerId != null) { + return this.containerId; + } + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) + builder.clearContainerId(); + this.containerId = containerId; + } + + @Override + public void setDumpThreads(boolean dumpThreads) { + maybeInitBuilder(); + if (!dumpThreads) { + builder.clearDumpThreads(); + } + this.dumpThreads = dumpThreads; + } + + @Override + public boolean getDumpThreads() { + if (dumpThreads) { + return true; + } + StopContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.hasDumpThreads() ? p.getDumpThreads() : false; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java index 27e092b..520a1a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java @@ -23,10 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProtoOrBuilder; @@ -41,6 +43,7 @@ boolean viaProto = false; private List containerIds = null; + private List stopRequests; public StopContainersRequestPBImpl() { builder = StopContainersRequestProto.newBuilder(); @@ -82,6 +85,9 @@ private void mergeLocalToBuilder() { if (this.containerIds != null) { addLocalContainerIdsToProto(); } + if (stopRequests != null) { + addLocalRequestsToProto(); + } } private void mergeLocalToProto() { @@ -123,6 +129,32 @@ private void initLocalContainerIds() { } } + private void addLocalRequestsToProto() { + maybeInitBuilder(); + builder.clearStopRequests(); + if (stopRequests == null) { + return; + } + List protoList = + new ArrayList(stopRequests.size()); + for (StopContainerRequest requestProto : stopRequests) { + protoList.add(convertToProtoFormat(requestProto)); + } + builder.addAllStopRequests(protoList); + } + + private void initLocalStopRequests() { + if (stopRequests != null) { + return; + } + StopContainersRequestProtoOrBuilder p = viaProto ? proto : builder; + List requestProtos = p.getStopRequestsList(); + stopRequests = new ArrayList(requestProtos.size()); + for (StopContainerRequestProto r : requestProtos) { + stopRequests.add(convertFromProtoFormat(r)); + } + } + @Override public List getContainerIds() { initLocalContainerIds(); @@ -137,6 +169,21 @@ public void setContainerIds(List containerIds) { this.containerIds = containerIds; } + @Override + public List getStopRequests() { + initLocalStopRequests(); + return stopRequests; + } + + @Override + public void setStopRequests(List stopRequests) { + maybeInitBuilder(); + if (stopRequests == null) { + builder.clearStopRequests(); + } + this.stopRequests = stopRequests; + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } @@ -144,4 +191,14 @@ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { private ContainerIdProto convertToProtoFormat(ContainerId t) { return ((ContainerIdPBImpl) t).getProto(); } + + private StopContainerRequestPBImpl convertFromProtoFormat( + StopContainerRequestProto p) { + return new StopContainerRequestPBImpl(p); + } + + private StopContainerRequestProto convertToProtoFormat( + StopContainerRequest t) { + return ((StopContainerRequestPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3..000a9b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -700,7 +701,19 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi); for (ContainerId id : requests.getContainerIds()) { try { - stopContainerInternal(identifier, id); + stopContainerInternal(identifier, id, false); + succeededRequests.add(id); + } catch (YarnException e) { + failedRequests.put(id, SerializedException.newInstance(e)); + } + } + + // process composite rpc + for (StopContainerRequest r : requests.getStopRequests()) { + final ContainerId id = r.getContainerId(); + final boolean dumpThreads = r.getDumpThreads(); + try { + stopContainerInternal(identifier, id, dumpThreads); succeededRequests.add(id); } catch (YarnException e) { failedRequests.put(id, SerializedException.newInstance(e)); @@ -712,12 +725,12 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) @SuppressWarnings("unchecked") private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException { + ContainerId containerID, boolean dumpThreads) throws YarnException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, true, - nmTokenIdentifier); + nmTokenIdentifier); if (container == null) { if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { @@ -725,9 +738,8 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, + " is not handled by this NodeManager"); } } else { - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + dispatcher.getEventHandler().handle(new ContainerKillEvent(containerID, + "Container killed by the ApplicationMaster.", dumpThreads)); NMAuditLogger.logSuccess(container.getUser(), AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 862e3fa..d364b6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -127,6 +127,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = new ContainerDiagnosticsUpdateTransition(); + private static final KillTransition KILL_TRANSITION = new KillTransition(); + // State Machine for each container. private static StateMachineFactory @@ -191,7 +193,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, - ContainerEventType.KILL_CONTAINER, new KillTransition()) + ContainerEventType.KILL_CONTAINER, KILL_TRANSITION) // From RUNNING State .addTransition(ContainerState.RUNNING, @@ -206,7 +208,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, - ContainerEventType.KILL_CONTAINER, new KillTransition()) + ContainerEventType.KILL_CONTAINER, KILL_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) @@ -796,11 +798,12 @@ public void transition(ContainerImpl container, ContainerEvent event) { SingleArcTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { + ContainerKillEvent killEvent = (ContainerKillEvent) event; // Kill the process/process-grp container.dispatcher.getEventHandler().handle( new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); - ContainerKillEvent killEvent = (ContainerKillEvent) event; + ContainersLauncherEventType.CLEANUP_CONTAINER, + killEvent.getDumpThreads())); container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java index 313b6a8..26fb78e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java @@ -23,13 +23,24 @@ public class ContainerKillEvent extends ContainerEvent { private final String diagnostic; + private final boolean dumpThreads; public ContainerKillEvent(ContainerId cID, String diagnostic) { + this(cID, diagnostic, false); + } + + public ContainerKillEvent(ContainerId cID, String diagnostic, + boolean dumpThreads) { super(cID, ContainerEventType.KILL_CONTAINER); this.diagnostic = diagnostic; + this.dumpThreads = dumpThreads; } public String getDiagnostic() { return this.diagnostic; } + + public boolean getDumpThreads() { + return dumpThreads; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 8b08965..9dbc267 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -335,7 +335,7 @@ public Integer call() { * @throws IOException */ @SuppressWarnings("unchecked") // dispatcher not typed - public void cleanupContainer() throws IOException { + public void cleanupContainer(boolean dumpThreads) throws IOException { ContainerId containerId = container.getContainerId(); String containerIdStr = ConverterUtils.toString(containerId); LOG.info("Cleaning up container " + containerIdStr); @@ -372,6 +372,15 @@ public void cleanupContainer() throws IOException { // kill process if (processId != null) { String user = container.getUser(); + boolean result; + if (dumpThreads) { + result = exec.signalContainer(user, processId, Signal.QUIT); + if (!result && LOG.isDebugEnabled()) { + LOG.debug("Failed to send " + Signal.QUIT + " to " + processId + + " to generate JVM thread dump"); + } + } + LOG.debug("Sending signal to pid " + processId + " as user " + user + " for container " + containerIdStr); @@ -380,7 +389,7 @@ public void cleanupContainer() throws IOException { ? Signal.TERM : Signal.KILL; - boolean result = exec.signalContainer(user, processId, signal); + result = exec.signalContainer(user, processId, signal); LOG.debug("Sent signal " + signal + " to pid " + processId + " as user " + user diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index ce865e3..6ddbf49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,16 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -135,7 +129,7 @@ public void handle(ContainersLauncherEvent event) { // Cleanup a container whether it is running/killed/completed, so that // no sub-processes are alive. try { - launcher.cleanupContainer(); + launcher.cleanupContainer(event.getDumpThreads()); } catch (IOException e) { LOG.warn("Got exception while cleaning container " + containerId + ". Ignoring."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java index 38bedf2..bbf0d1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java @@ -19,22 +19,33 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; public class ContainersLauncherEvent extends AbstractEvent{ private final Container container; + private final boolean dumpThreads; public ContainersLauncherEvent(Container container, ContainersLauncherEventType eventType) { + this(container, eventType, false); + } + + public ContainersLauncherEvent(Container container, + ContainersLauncherEventType eventType, + boolean dumpThreads) { super(eventType); this.container = container; + this.dumpThreads = dumpThreads; } public Container getContainer() { return container; } + public boolean getDumpThreads() { + return dumpThreads; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index 81cf797..4daa738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.spy; import java.io.BufferedReader; import java.io.File; @@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -69,11 +69,11 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; @@ -625,49 +625,7 @@ private void internalKillTest(boolean delayed) throws Exception { writer.println("while true; do\nsleep 1s;\ndone"); } writer.close(); - FileUtil.setExecutable(scriptFile, true); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - // upload the script file so that the container can run it - URL resource_alpha = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = - recordFactory.newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file.sh"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, rsrc_alpha); - containerLaunchContext.setLocalResources(localResources); - - // set up the rest of the container - List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - containerLaunchContext.setCommands(commands); - Token containerToken = createContainerToken(cId); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - containerToken); - List list = new ArrayList(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - int timeoutSecs = 0; - while (!processStartFile.exists() && timeoutSecs++ < 20) { - Thread.sleep(1000); - LOG.info("Waiting for process start-file to be created"); - } - Assert.assertTrue("ProcessStartFile doesn't exist!", - processStartFile.exists()); + startKillTestContainer(cId, processStartFile, scriptFile); // Now test the stop functionality. List containerIds = new ArrayList(); @@ -718,6 +676,51 @@ private void internalKillTest(boolean delayed) throws Exception { } } + private void startKillTestContainer(ContainerId cId, File processStartFile, File scriptFile) throws YarnException, IOException, InterruptedException { + FileUtil.setExecutable(scriptFile, true); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // upload the script file so that the container can run it + URL resource_alpha = ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file.sh"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + + // set up the rest of the container + List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + Token containerToken = createContainerToken(cId); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + containerToken); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + } + @Test public void testDelayedKill() throws Exception { internalKillTest(true); @@ -767,4 +770,82 @@ protected Token createContainerToken(ContainerId cId) throws InvalidToken { return containerToken; } + public static class SleepTask { + public static void main(String[] args) throws Throwable { + if (args.length != 1) { + System.exit(1); + } + final int sleepTime = Integer.valueOf(args[0]); + System.out.printf("Sleeping for %d milliseconds\n", sleepTime); + Thread.sleep(sleepTime); + System.out.println("Exiting after sleeping"); + } + } + + + @Test + public void testThreadDumpKill() throws Exception { + containerManager.start(); + + File processStartFile = new File(tmpDir, "pid.txt").getAbsoluteFile(); + + // setup a script that can handle sigterm gracefully + File scriptFile = new File(tmpDir, "testscript.sh"); + PrintWriter writer = new PrintWriter( + new FileOutputStream(scriptFile)); + writer.println("#!/bin/bash\n\n"); + writer.println("exec $JAVA_HOME/bin/java -classpath " + + System.getProperty("java.class.path") + " " + + SleepTask.class.getName() + " 600000 >> " + processStartFile); + writer.close(); + FileUtil.setExecutable(scriptFile, true); + + // ////// Construct the Container-id + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId. + newInstance(appId, 1); + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + + startKillTestContainer(cId, processStartFile, scriptFile); + + // Now test the stop functionality. + List containerIds = new ArrayList(); + containerIds.add(cId); + containerManager.stopContainers(StopContainersRequest.newInstance1( + Collections.singletonList( + StopContainerRequest.newInstance(cId, true)))); + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + // if delayed container stop sends a sigterm followed by a sigkill + // otherwise sigkill is sent immediately + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + + ContainerStatus containerStatus = + containerManager.getContainerStatuses(gcsRequest) + .getContainerStatuses().get(0); + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), + containerStatus.getExitStatus()); + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(cId.toString())); + + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + + boolean found = false; + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (line.contains("Full thread dump")) { + found = true; + break; + } + } + Assert.assertTrue("Did not find \"Full thread dump\"", found); + reader.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index c9e57a6..7989aee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -218,59 +219,67 @@ synchronized public void checkResourceUsage() { resourceManager.getResourceScheduler().getNodeReport( this.nodeId).getUsedResource().getMemory()); } - + @Override - synchronized public StopContainersResponse stopContainers(StopContainersRequest request) + synchronized public StopContainersResponse stopContainers(StopContainersRequest request) throws YarnException { for (ContainerId containerID : request.getContainerIds()) { - String applicationId = - String.valueOf(containerID.getApplicationAttemptId() - .getApplicationId().getId()); - // Mark the container as COMPLETE - List applicationContainers = containers.get(containerID.getApplicationAttemptId() - .getApplicationId()); - for (Container c : applicationContainers) { - if (c.getId().compareTo(containerID) == 0) { - ContainerStatus containerStatus = containerStatusMap.get(c); - containerStatus.setState(ContainerState.COMPLETE); - containerStatusMap.put(c, containerStatus); - } - } + stopContainerInternal(containerID); + } + // process composite rpc, dumpThreads ignored + for (StopContainerRequest r : request.getStopRequests()) { + stopContainerInternal(r.getContainerId()); + } + return StopContainersResponse.newInstance(null,null); + } - // Send a heartbeat - try { - heartbeat(); - } catch (IOException ioe) { - throw RPCUtil.getRemoteException(ioe); + private void stopContainerInternal(ContainerId containerID) throws YarnException { + String applicationId = + String.valueOf(containerID.getApplicationAttemptId() + .getApplicationId().getId()); + // Mark the container as COMPLETE + List applicationContainers = containers.get(containerID.getApplicationAttemptId() + .getApplicationId()); + for (Container c : applicationContainers) { + if (c.getId().compareTo(containerID) == 0) { + ContainerStatus containerStatus = containerStatusMap.get(c); + containerStatus.setState(ContainerState.COMPLETE); + containerStatusMap.put(c, containerStatus); } + } - // Remove container and update status - int ctr = 0; - Container container = null; - for (Iterator i = applicationContainers.iterator(); i - .hasNext();) { - container = i.next(); - if (container.getId().compareTo(containerID) == 0) { - i.remove(); - ++ctr; - } - } + // Send a heartbeat + try { + heartbeat(); + } catch (IOException ioe) { + throw RPCUtil.getRemoteException(ioe); + } - if (ctr != 1) { - throw new IllegalStateException("Container " + containerID - + " stopped " + ctr + " times!"); + // Remove container and update status + int ctr = 0; + Container container = null; + for (Iterator i = applicationContainers.iterator(); i + .hasNext();) { + container = i.next(); + if (container.getId().compareTo(containerID) == 0) { + i.remove(); + ++ctr; } + } - Resources.addTo(available, container.getResource()); - Resources.subtractFrom(used, container.getResource()); + if (ctr != 1) { + throw new IllegalStateException("Container " + containerID + + " stopped " + ctr + " times!"); + } - if (LOG.isDebugEnabled()) { - LOG.debug("stopContainer:" + " node=" + containerManagerAddress - + " application=" + applicationId + " container=" + containerID - + " available=" + available + " used=" + used); - } + Resources.addTo(available, container.getResource()); + Resources.subtractFrom(used, container.getResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug("stopContainer:" + " node=" + containerManagerAddress + + " application=" + applicationId + " container=" + containerID + + " available=" + available + " used=" + used); } - return StopContainersResponse.newInstance(null,null); } @Override