commit f086dcdf8c3f3977affaf74ec844f39e003c0398 Author: Gera Shegalov Date: Thu May 8 16:18:07 2014 -0700 YARN-1515.v06.patch diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java new file mode 100644 index 0000000..b0d023e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java @@ -0,0 +1,73 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Records; + +/** + *

The request sent by the ApplicationMaster to the + * NodeManager to stop containers.

+ * + * @see ContainerManagementProtocol#stopContainers(StopContainersRequest) + */ +@Public +@Unstable +public abstract class StopContainerRequest { + + @Public + @Unstable + public static StopContainerRequest newInstance(ContainerId containerId, + boolean dumpThreads) { + StopContainerRequest request = + Records.newRecord(StopContainerRequest.class); + request.setContainerId(containerId); + request.setDumpThreads(dumpThreads); + return request; + } + + /** + * Get the ContainerId of the container to be stopped. + * @return ContainerId of container to be stopped + */ + @Public + @Unstable + public abstract ContainerId getContainerId(); + + /** + * Set the ContainerIds of the container to be stopped. + * @param containerId ContainerId of the container to be stopped + */ + @Public + @Unstable + public abstract void setContainerId(ContainerId containerId); + + public abstract boolean getDumpThreads(); + + /** + * Set the DumpThreadss of the container to be stopped. + * @param dumpThreads DumpThreads of the container to be stopped + */ + @Public + @Unstable + public abstract void setDumpThreads(boolean dumpThreads); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java index 8ea186c..7350696 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.Records; @@ -45,6 +46,16 @@ public static StopContainersRequest newInstance(List containerIds) return request; } + @Public + @Unstable + public static StopContainersRequest newInstance1( + List stopRequests) { + StopContainersRequest request = + Records.newRecord(StopContainersRequest.class); + request.setStopRequests(stopRequests); + return request; + } + /** * Get the ContainerIds of the containers to be stopped. * @return ContainerIds of containers to be stopped @@ -60,4 +71,18 @@ public static StopContainersRequest newInstance(List containerIds) @Public @Stable public abstract void setContainerIds(List containerIds); + + /** + * @return StopRequests + */ + @Public + @Unstable + public abstract List getStopRequests(); + + /** + * @param stopRequests StopRequests. + */ + @Public + @Stable + public abstract void setStopRequests(List stopRequests); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..fd96946 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -203,6 +203,7 @@ message StartContainerResponseProto { message StopContainerRequestProto { optional ContainerIdProto container_id = 1; + optional bool dump_threads = 2; } message StopContainerResponseProto { @@ -234,6 +235,7 @@ message StartContainersResponseProto { message StopContainersRequestProto { repeated ContainerIdProto container_id = 1; + repeated StopContainerRequestProto stop_requests = 2; } message StopContainersResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java new file mode 100644 index 0000000..41a8d05 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProtoOrBuilder; + + + +public class StopContainerRequestPBImpl extends StopContainerRequest { + StopContainerRequestProto proto = StopContainerRequestProto.getDefaultInstance(); + StopContainerRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId = null; + private boolean dumpThreads; + + + public StopContainerRequestPBImpl() { + builder = StopContainerRequestProto.newBuilder(); + } + + public StopContainerRequestPBImpl(StopContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public StopContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + + if (this.dumpThreads) { + builder.setDumpThreads(this.dumpThreads); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = StopContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + + @Override + public ContainerId getContainerId() { + StopContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerId != null) { + return this.containerId; + } + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) + builder.clearContainerId(); + this.containerId = containerId; + } + + @Override + public void setDumpThreads(boolean dumpThreads) { + maybeInitBuilder(); + if (!dumpThreads) { + builder.clearDumpThreads(); + } + this.dumpThreads = dumpThreads; + } + + @Override + public boolean getDumpThreads() { + if (dumpThreads) { + return true; + } + StopContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.hasDumpThreads() ? p.getDumpThreads() : false; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java index 27e092b..520a1a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java @@ -23,10 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProtoOrBuilder; @@ -41,6 +43,7 @@ boolean viaProto = false; private List containerIds = null; + private List stopRequests; public StopContainersRequestPBImpl() { builder = StopContainersRequestProto.newBuilder(); @@ -82,6 +85,9 @@ private void mergeLocalToBuilder() { if (this.containerIds != null) { addLocalContainerIdsToProto(); } + if (stopRequests != null) { + addLocalRequestsToProto(); + } } private void mergeLocalToProto() { @@ -123,6 +129,32 @@ private void initLocalContainerIds() { } } + private void addLocalRequestsToProto() { + maybeInitBuilder(); + builder.clearStopRequests(); + if (stopRequests == null) { + return; + } + List protoList = + new ArrayList(stopRequests.size()); + for (StopContainerRequest requestProto : stopRequests) { + protoList.add(convertToProtoFormat(requestProto)); + } + builder.addAllStopRequests(protoList); + } + + private void initLocalStopRequests() { + if (stopRequests != null) { + return; + } + StopContainersRequestProtoOrBuilder p = viaProto ? proto : builder; + List requestProtos = p.getStopRequestsList(); + stopRequests = new ArrayList(requestProtos.size()); + for (StopContainerRequestProto r : requestProtos) { + stopRequests.add(convertFromProtoFormat(r)); + } + } + @Override public List getContainerIds() { initLocalContainerIds(); @@ -137,6 +169,21 @@ public void setContainerIds(List containerIds) { this.containerIds = containerIds; } + @Override + public List getStopRequests() { + initLocalStopRequests(); + return stopRequests; + } + + @Override + public void setStopRequests(List stopRequests) { + maybeInitBuilder(); + if (stopRequests == null) { + builder.clearStopRequests(); + } + this.stopRequests = stopRequests; + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } @@ -144,4 +191,14 @@ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { private ContainerIdProto convertToProtoFormat(ContainerId t) { return ((ContainerIdPBImpl) t).getProto(); } + + private StopContainerRequestPBImpl convertFromProtoFormat( + StopContainerRequestProto p) { + return new StopContainerRequestPBImpl(p); + } + + private StopContainerRequestProto convertToProtoFormat( + StopContainerRequest t) { + return ((StopContainerRequestPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3..000a9b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -700,7 +701,19 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi); for (ContainerId id : requests.getContainerIds()) { try { - stopContainerInternal(identifier, id); + stopContainerInternal(identifier, id, false); + succeededRequests.add(id); + } catch (YarnException e) { + failedRequests.put(id, SerializedException.newInstance(e)); + } + } + + // process composite rpc + for (StopContainerRequest r : requests.getStopRequests()) { + final ContainerId id = r.getContainerId(); + final boolean dumpThreads = r.getDumpThreads(); + try { + stopContainerInternal(identifier, id, dumpThreads); succeededRequests.add(id); } catch (YarnException e) { failedRequests.put(id, SerializedException.newInstance(e)); @@ -712,12 +725,12 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) @SuppressWarnings("unchecked") private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException { + ContainerId containerID, boolean dumpThreads) throws YarnException { String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, true, - nmTokenIdentifier); + nmTokenIdentifier); if (container == null) { if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { @@ -725,9 +738,8 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, + " is not handled by this NodeManager"); } } else { - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + dispatcher.getEventHandler().handle(new ContainerKillEvent(containerID, + "Container killed by the ApplicationMaster.", dumpThreads)); NMAuditLogger.logSuccess(container.getUser(), AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 50653f5..7d72d26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -128,6 +128,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = new ContainerDiagnosticsUpdateTransition(); + private static final KillTransition KILL_TRANSITION = new KillTransition(); + // State Machine for each container. private static StateMachineFactory @@ -192,7 +194,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, - ContainerEventType.KILL_CONTAINER, new KillTransition()) + ContainerEventType.KILL_CONTAINER, KILL_TRANSITION) // From RUNNING State .addTransition(ContainerState.RUNNING, @@ -207,7 +209,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, - ContainerEventType.KILL_CONTAINER, new KillTransition()) + ContainerEventType.KILL_CONTAINER, KILL_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) @@ -799,11 +801,12 @@ public void transition(ContainerImpl container, ContainerEvent event) { SingleArcTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { + ContainerKillEvent killEvent = (ContainerKillEvent) event; // Kill the process/process-grp container.dispatcher.getEventHandler().handle( new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); - ContainerKillEvent killEvent = (ContainerKillEvent) event; + ContainersLauncherEventType.CLEANUP_CONTAINER, + killEvent.getDumpThreads())); container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java index 313b6a8..26fb78e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java @@ -23,13 +23,24 @@ public class ContainerKillEvent extends ContainerEvent { private final String diagnostic; + private final boolean dumpThreads; public ContainerKillEvent(ContainerId cID, String diagnostic) { + this(cID, diagnostic, false); + } + + public ContainerKillEvent(ContainerId cID, String diagnostic, + boolean dumpThreads) { super(cID, ContainerEventType.KILL_CONTAINER); this.diagnostic = diagnostic; + this.dumpThreads = dumpThreads; } public String getDiagnostic() { return this.diagnostic; } + + public boolean getDumpThreads() { + return dumpThreads; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index e252e35..81bbde9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -352,7 +352,7 @@ public Integer call() { * @throws IOException */ @SuppressWarnings("unchecked") // dispatcher not typed - public void cleanupContainer() throws IOException { + public void cleanupContainer(boolean dumpThreads) throws IOException { ContainerId containerId = container.getContainerId(); String containerIdStr = ConverterUtils.toString(containerId); LOG.info("Cleaning up container " + containerIdStr); @@ -389,6 +389,15 @@ public void cleanupContainer() throws IOException { // kill process if (processId != null) { String user = container.getUser(); + boolean result; + if (dumpThreads) { + result = exec.signalContainer(user, processId, Signal.QUIT); + if (!result && LOG.isDebugEnabled()) { + LOG.debug("Failed to send " + Signal.QUIT + " to " + processId + + " to generate JVM thread dump"); + } + } + LOG.debug("Sending signal to pid " + processId + " as user " + user + " for container " + containerIdStr); @@ -397,7 +406,7 @@ public void cleanupContainer() throws IOException { ? Signal.TERM : Signal.KILL; - boolean result = exec.signalContainer(user, processId, signal); + result = exec.signalContainer(user, processId, signal); LOG.debug("Sent signal " + signal + " to pid " + processId + " as user " + user diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index ce865e3..6ddbf49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,16 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -135,7 +129,7 @@ public void handle(ContainersLauncherEvent event) { // Cleanup a container whether it is running/killed/completed, so that // no sub-processes are alive. try { - launcher.cleanupContainer(); + launcher.cleanupContainer(event.getDumpThreads()); } catch (IOException e) { LOG.warn("Got exception while cleaning container " + containerId + ". Ignoring."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java index 38bedf2..bbf0d1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java @@ -19,22 +19,33 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; public class ContainersLauncherEvent extends AbstractEvent{ private final Container container; + private final boolean dumpThreads; public ContainersLauncherEvent(Container container, ContainersLauncherEventType eventType) { + this(container, eventType, false); + } + + public ContainersLauncherEvent(Container container, + ContainersLauncherEventType eventType, + boolean dumpThreads) { super(eventType); this.container = container; + this.dumpThreads = dumpThreads; } public Container getContainer() { return container; } + public boolean getDumpThreads() { + return dumpThreads; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index c8fc85a..1a3573b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -259,7 +260,7 @@ public void testInvalidEnvSyntaxDiagnostics() throws IOException { env.put( "APPLICATION_WORKFLOW_CONTEXT", "{\"workflowId\":\"609f91c5cd83\"," + "\"workflowName\":\"\n\ninsert table " + - "\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " ); + "\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, "); List commands = new ArrayList(); ContainerLaunch.writeLaunchEnv(fos, env, resources, commands); fos.flush(); @@ -623,6 +624,52 @@ public void testAuxiliaryServiceHelper() throws Exception { AuxiliaryServiceHelper.getServiceDataFromEnv(serviceName, env)); } + private void startKillTestContainer(ContainerId cId, File processStartFile, + File scriptFile) throws Exception { + FileUtil.setExecutable(scriptFile, true); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // upload the script file so that the container can run it + URL resource_alpha = ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file.sh"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + + // set up the rest of the container + List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + Token containerToken = createContainerToken(cId); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + containerToken); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + } + private void internalKillTest(boolean delayed) throws Exception { conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, delayed ? 1000 : 0); @@ -655,50 +702,7 @@ private void internalKillTest(boolean delayed) throws Exception { writer.println("while true; do\nsleep 1s;\ndone"); } writer.close(); - FileUtil.setExecutable(scriptFile, true); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - // upload the script file so that the container can run it - URL resource_alpha = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = - recordFactory.newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file.sh"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, rsrc_alpha); - containerLaunchContext.setLocalResources(localResources); - - // set up the rest of the container - List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - containerLaunchContext.setCommands(commands); - Token containerToken = createContainerToken(cId); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - containerToken); - List list = new ArrayList(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - int timeoutSecs = 0; - while (!processStartFile.exists() && timeoutSecs++ < 20) { - Thread.sleep(1000); - LOG.info("Waiting for process start-file to be created"); - } - Assert.assertTrue("ProcessStartFile doesn't exist!", - processStartFile.exists()); - + startKillTestContainer(cId, processStartFile, scriptFile); // Now test the stop functionality. List containerIds = new ArrayList(); containerIds.add(cId); @@ -748,6 +752,84 @@ private void internalKillTest(boolean delayed) throws Exception { } } + public static class SleepTask { + public static void main(String[] args) throws Throwable { + if (args.length != 1) { + System.exit(1); + } + final int sleepTime = Integer.valueOf(args[0]); + System.out.printf("Sleeping for %d milliseconds\n", sleepTime); + Thread.sleep(sleepTime); + System.out.println("Exiting after sleeping"); + } + } + + @Test (timeout = 30000) + public void testThreadDumpKill() throws Exception { + containerManager.start(); + + File processStartFile = new File(tmpDir, "pid.txt").getAbsoluteFile(); + + // setup a script that can handle sigterm gracefully + File scriptFile = new File(tmpDir, "testscript.sh"); + PrintWriter writer = new PrintWriter( + new FileOutputStream(scriptFile)); + writer.println("#!/bin/bash\n\n"); + writer.println("exec $JAVA_HOME/bin/java -classpath " + + System.getProperty("java.class.path") + " \'" + + SleepTask.class.getName() + "\' 600000 >> " + processStartFile); + writer.close(); + FileUtil.setExecutable(scriptFile, true); + + // ////// Construct the Container-id + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId. + newInstance(appId, 1); + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + + startKillTestContainer(cId, processStartFile, scriptFile); + + // Now test the stop functionality. + List containerIds = new ArrayList(); + containerIds.add(cId); + containerManager.stopContainers(StopContainersRequest.newInstance1( + Collections.singletonList( + StopContainerRequest.newInstance(cId, true)))); + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + // if delayed container stop sends a sigterm followed by a sigkill + // otherwise sigkill is sent immediately + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + + ContainerStatus containerStatus = + containerManager.getContainerStatuses(gcsRequest) + .getContainerStatuses().get(0); + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), + containerStatus.getExitStatus()); + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(cId.toString())); + + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + + boolean found = false; + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (line.contains("Full thread dump")) { + found = true; + break; + } + } + Assert.assertTrue("Did not find \"Full thread dump\"", found); + reader.close(); + } + @Test (timeout = 30000) public void testDelayedKill() throws Exception { internalKillTest(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a1c1a40..b28cf67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -72,23 +74,20 @@ private ContainerManagementProtocol containerMgrProxy; - private final RMAppAttempt application; private final Configuration conf; - private final AMLauncherEventType eventType; + private final AMLauncherEvent event; private final RMContext rmContext; private final Container masterContainer; - + @SuppressWarnings("rawtypes") private final EventHandler handler; - public AMLauncher(RMContext rmContext, RMAppAttempt application, - AMLauncherEventType eventType, Configuration conf) { - this.application = application; + public AMLauncher(RMContext rmContext, AMLauncherEvent event, Configuration conf) { this.conf = conf; - this.eventType = eventType; + this.event = event; this.rmContext = rmContext; this.handler = rmContext.getDispatcher().getEventHandler(); - this.masterContainer = application.getMasterContainer(); + this.masterContainer = event.getAppAttempt().getMasterContainer(); } private void connect() throws IOException { @@ -99,6 +98,7 @@ private void connect() throws IOException { private void launch() throws IOException, YarnException { connect(); + final RMAppAttempt application = event.getAppAttempt(); ContainerId masterContainerID = masterContainer.getId(); ApplicationSubmissionContext applicationContext = application.getSubmissionContext(); @@ -128,15 +128,14 @@ private void launch() throws IOException, YarnException { } } - private void cleanup() throws IOException, YarnException { + private void cleanup(boolean dumpThreads) throws IOException, YarnException { connect(); ContainerId containerId = masterContainer.getId(); - List containerIds = new ArrayList(); - containerIds.add(containerId); - StopContainersRequest stopRequest = - StopContainersRequest.newInstance(containerIds); + StopContainerRequest stopRequest = + StopContainerRequest.newInstance(containerId, dumpThreads); StopContainersResponse response = - containerMgrProxy.stopContainers(stopRequest); + containerMgrProxy.stopContainers(StopContainersRequest.newInstance1( + Collections.singletonList(stopRequest))); if (response.getFailedRequests() != null && response.getFailedRequests().containsKey(containerId)) { Throwable t = response.getFailedRequests().get(containerId).deSerialize(); @@ -148,7 +147,7 @@ private void cleanup() throws IOException, YarnException { protected ContainerManagementProtocol getContainerMgrProxy( final ContainerId containerId) { - final NodeId node = masterContainer.getNodeId(); + final NodeId node = event.getAppAttempt().getMasterContainer().getNodeId(); final InetSocketAddress containerManagerBindAddress = NetUtils.createSocketAddrForHost(node.getHost(), node.getPort()); @@ -202,6 +201,7 @@ private ContainerLaunchContext createAMContainerLaunchContext( private void setupTokens( ContainerLaunchContext container, ContainerId containerID) throws IOException { + final RMAppAttempt application = event.getAppAttempt(); Map environment = container.getEnvironment(); environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV, application.getWebProxyBase()); @@ -237,12 +237,13 @@ private void setupTokens( @VisibleForTesting protected Token getAMRMToken() { - return application.getAMRMToken(); + return event.getAppAttempt().getAMRMToken(); } @SuppressWarnings("unchecked") public void run() { - switch (eventType) { + final RMAppAttempt application = event.getAppAttempt(); + switch (event.getType()) { case LAUNCH: try { LOG.info("Launching master" + application.getAppAttemptId()); @@ -260,7 +261,7 @@ public void run() { case CLEANUP: try { LOG.info("Cleaning master " + application.getAppAttemptId()); - cleanup(); + cleanup(event.getDumpThreads()); } catch(IOException ie) { LOG.info("Error cleaning master ", ie); } catch (YarnException e) { @@ -274,7 +275,7 @@ public void run() { } break; default: - LOG.warn("Received unknown event-type " + eventType + ". Ignoring."); + LOG.warn("Received unknown event-type " + event.getType() + ". Ignoring."); break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java index c79dbbc..a574b94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherEvent.java @@ -24,14 +24,25 @@ public class AMLauncherEvent extends AbstractEvent { private final RMAppAttempt appAttempt; + private final boolean dumpThreads; public AMLauncherEvent(AMLauncherEventType type, RMAppAttempt appAttempt) { + this(type, appAttempt, false); + } + + public AMLauncherEvent(AMLauncherEventType type, RMAppAttempt appAttempt, + boolean dumpThreads) { super(type); this.appAttempt = appAttempt; + this.dumpThreads = dumpThreads; } public RMAppAttempt getAppAttempt() { return this.appAttempt; } + public boolean getDumpThreads() { + return dumpThreads; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java index af02b19..17e616c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java @@ -56,16 +56,15 @@ protected void serviceStart() throws Exception { super.serviceStart(); } - protected Runnable createRunnableLauncher(RMAppAttempt application, - AMLauncherEventType event) { + protected Runnable createRunnableLauncher(AMLauncherEvent event) { Runnable launcher = - new AMLauncher(context, application, event, getConfig()); + new AMLauncher(context, event, getConfig()); return launcher; } private void launch(RMAppAttempt application) { - Runnable launcher = createRunnableLauncher(application, - AMLauncherEventType.LAUNCH); + Runnable launcher = createRunnableLauncher( + new AMLauncherEvent(AMLauncherEventType.LAUNCH, application)); masterEvents.add(launcher); } @@ -102,8 +101,8 @@ public void run() { } } - private void cleanup(RMAppAttempt application) { - Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP); + private void cleanup(AMLauncherEvent event) { + Runnable launcher = createRunnableLauncher(event); masterEvents.add(launcher); } @@ -116,7 +115,7 @@ public synchronized void handle(AMLauncherEvent appEvent) { launch(application); break; case CLEANUP: - cleanup(application); + cleanup(appEvent); default: break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index efe0721..5633e59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1269,7 +1269,8 @@ public void transition(RMAppAttemptImpl appAttempt, if(!appAttempt.submissionContext.getUnmanagedAM()) { // Tell the launcher to cleanup. appAttempt.eventHandler.handle(new AMLauncherEvent( - AMLauncherEventType.CLEANUP, appAttempt)); + AMLauncherEventType.CLEANUP, appAttempt, + this instanceof ExpiredTransition)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java index 0ea2b5e..b6f48f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -50,9 +51,8 @@ public MockRMWithCustomAMLauncher(Configuration conf, protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(getRMContext()) { @Override - protected Runnable createRunnableLauncher(RMAppAttempt application, - AMLauncherEventType event) { - return new AMLauncher(context, application, event, getConfig()) { + protected Runnable createRunnableLauncher(AMLauncherEvent event) { + return new AMLauncher(context, event, getConfig()) { @Override protected ContainerManagementProtocol getContainerMgrProxy( ContainerId containerId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index d8d474e..085e929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -218,59 +219,67 @@ synchronized public void checkResourceUsage() { resourceManager.getResourceScheduler().getNodeReport( this.nodeId).getUsedResource().getMemory()); } - + @Override - synchronized public StopContainersResponse stopContainers(StopContainersRequest request) + synchronized public StopContainersResponse stopContainers(StopContainersRequest request) throws YarnException { for (ContainerId containerID : request.getContainerIds()) { - String applicationId = - String.valueOf(containerID.getApplicationAttemptId() - .getApplicationId().getId()); - // Mark the container as COMPLETE - List applicationContainers = containers.get(containerID.getApplicationAttemptId() - .getApplicationId()); - for (Container c : applicationContainers) { - if (c.getId().compareTo(containerID) == 0) { - ContainerStatus containerStatus = containerStatusMap.get(c); - containerStatus.setState(ContainerState.COMPLETE); - containerStatusMap.put(c, containerStatus); - } - } + stopContainerInternal(containerID); + } + // process composite rpc, dumpThreads ignored + for (StopContainerRequest r : request.getStopRequests()) { + stopContainerInternal(r.getContainerId()); + } + return StopContainersResponse.newInstance(null,null); + } - // Send a heartbeat - try { - heartbeat(); - } catch (IOException ioe) { - throw RPCUtil.getRemoteException(ioe); + private void stopContainerInternal(ContainerId containerID) throws YarnException { + String applicationId = + String.valueOf(containerID.getApplicationAttemptId() + .getApplicationId().getId()); + // Mark the container as COMPLETE + List applicationContainers = containers.get(containerID.getApplicationAttemptId() + .getApplicationId()); + for (Container c : applicationContainers) { + if (c.getId().compareTo(containerID) == 0) { + ContainerStatus containerStatus = containerStatusMap.get(c); + containerStatus.setState(ContainerState.COMPLETE); + containerStatusMap.put(c, containerStatus); } + } - // Remove container and update status - int ctr = 0; - Container container = null; - for (Iterator i = applicationContainers.iterator(); i - .hasNext();) { - container = i.next(); - if (container.getId().compareTo(containerID) == 0) { - i.remove(); - ++ctr; - } - } + // Send a heartbeat + try { + heartbeat(); + } catch (IOException ioe) { + throw RPCUtil.getRemoteException(ioe); + } - if (ctr != 1) { - throw new IllegalStateException("Container " + containerID - + " stopped " + ctr + " times!"); + // Remove container and update status + int ctr = 0; + Container container = null; + for (Iterator i = applicationContainers.iterator(); i + .hasNext();) { + container = i.next(); + if (container.getId().compareTo(containerID) == 0) { + i.remove(); + ++ctr; } + } - Resources.addTo(available, container.getResource()); - Resources.subtractFrom(used, container.getResource()); + if (ctr != 1) { + throw new IllegalStateException("Container " + containerID + + " stopped " + ctr + " times!"); + } - if (LOG.isDebugEnabled()) { - LOG.debug("stopContainer:" + " node=" + containerManagerAddress - + " application=" + applicationId + " container=" + containerID - + " available=" + available + " used=" + used); - } + Resources.addTo(available, container.getResource()); + Resources.subtractFrom(used, container.getResource()); + + if (LOG.isDebugEnabled()) { + LOG.debug("stopContainer:" + " node=" + containerManagerAddress + + " application=" + applicationId + " container=" + containerID + + " available=" + available + " used=" + used); } - return StopContainersResponse.newInstance(null,null); } @Override