diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java
new file mode 100644
index 0000000..b0d023e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainerRequest.java
@@ -0,0 +1,73 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
The request sent by the ApplicationMaster to the
+ * NodeManager to stop containers.
+ *
+ * @see ContainerManagementProtocol#stopContainers(StopContainersRequest)
+ */
+@Public
+@Unstable
+public abstract class StopContainerRequest {
+
+ @Public
+ @Unstable
+ public static StopContainerRequest newInstance(ContainerId containerId,
+ boolean dumpThreads) {
+ StopContainerRequest request =
+ Records.newRecord(StopContainerRequest.class);
+ request.setContainerId(containerId);
+ request.setDumpThreads(dumpThreads);
+ return request;
+ }
+
+ /**
+ * Get the ContainerId of the container to be stopped.
+ * @return ContainerId of container to be stopped
+ */
+ @Public
+ @Unstable
+ public abstract ContainerId getContainerId();
+
+ /**
+ * Set the ContainerIds of the container to be stopped.
+ * @param containerId ContainerId of the container to be stopped
+ */
+ @Public
+ @Unstable
+ public abstract void setContainerId(ContainerId containerId);
+
+ public abstract boolean getDumpThreads();
+
+ /**
+ * Set the DumpThreadss of the container to be stopped.
+ * @param dumpThreads DumpThreads of the container to be stopped
+ */
+ @Public
+ @Unstable
+ public abstract void setDumpThreads(boolean dumpThreads);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java
index 8ea186c..7350696 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StopContainersRequest.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.Records;
@@ -45,6 +46,16 @@ public static StopContainersRequest newInstance(List containerIds)
return request;
}
+ @Public
+ @Unstable
+ public static StopContainersRequest newInstance1(
+ List stopRequests) {
+ StopContainersRequest request =
+ Records.newRecord(StopContainersRequest.class);
+ request.setStopRequests(stopRequests);
+ return request;
+ }
+
/**
* Get the ContainerIds of the containers to be stopped.
* @return ContainerIds of containers to be stopped
@@ -60,4 +71,18 @@ public static StopContainersRequest newInstance(List containerIds)
@Public
@Stable
public abstract void setContainerIds(List containerIds);
+
+ /**
+ * @return StopRequests
+ */
+ @Public
+ @Unstable
+ public abstract List getStopRequests();
+
+ /**
+ * @param stopRequests StopRequests.
+ */
+ @Public
+ @Stable
+ public abstract void setStopRequests(List stopRequests);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index eff5cd7..06c31be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -202,6 +202,7 @@ message StartContainerResponseProto {
message StopContainerRequestProto {
optional ContainerIdProto container_id = 1;
+ optional bool dump_threads = 2;
}
message StopContainerResponseProto {
@@ -233,6 +234,7 @@ message StartContainersResponseProto {
message StopContainersRequestProto {
repeated ContainerIdProto container_id = 1;
+ repeated StopContainerRequestProto stop_requests = 2;
}
message StopContainersResponseProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java
new file mode 100644
index 0000000..41a8d05
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainerRequestPBImpl.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProtoOrBuilder;
+
+
+
+public class StopContainerRequestPBImpl extends StopContainerRequest {
+ StopContainerRequestProto proto = StopContainerRequestProto.getDefaultInstance();
+ StopContainerRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ContainerId containerId = null;
+ private boolean dumpThreads;
+
+
+ public StopContainerRequestPBImpl() {
+ builder = StopContainerRequestProto.newBuilder();
+ }
+
+ public StopContainerRequestPBImpl(StopContainerRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public StopContainerRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containerId != null) {
+ builder.setContainerId(convertToProtoFormat(this.containerId));
+ }
+
+ if (this.dumpThreads) {
+ builder.setDumpThreads(this.dumpThreads);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = StopContainerRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+
+ @Override
+ public ContainerId getContainerId() {
+ StopContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.containerId != null) {
+ return this.containerId;
+ }
+ if (!p.hasContainerId()) {
+ return null;
+ }
+ this.containerId = convertFromProtoFormat(p.getContainerId());
+ return this.containerId;
+ }
+
+ @Override
+ public void setContainerId(ContainerId containerId) {
+ maybeInitBuilder();
+ if (containerId == null)
+ builder.clearContainerId();
+ this.containerId = containerId;
+ }
+
+ @Override
+ public void setDumpThreads(boolean dumpThreads) {
+ maybeInitBuilder();
+ if (!dumpThreads) {
+ builder.clearDumpThreads();
+ }
+ this.dumpThreads = dumpThreads;
+ }
+
+ @Override
+ public boolean getDumpThreads() {
+ if (dumpThreads) {
+ return true;
+ }
+ StopContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasDumpThreads() ? p.getDumpThreads() : false;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl)t).getProto();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java
index 27e092b..520a1a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StopContainersRequestPBImpl.java
@@ -23,10 +23,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProtoOrBuilder;
@@ -41,6 +43,7 @@
boolean viaProto = false;
private List containerIds = null;
+ private List stopRequests;
public StopContainersRequestPBImpl() {
builder = StopContainersRequestProto.newBuilder();
@@ -82,6 +85,9 @@ private void mergeLocalToBuilder() {
if (this.containerIds != null) {
addLocalContainerIdsToProto();
}
+ if (stopRequests != null) {
+ addLocalRequestsToProto();
+ }
}
private void mergeLocalToProto() {
@@ -123,6 +129,32 @@ private void initLocalContainerIds() {
}
}
+ private void addLocalRequestsToProto() {
+ maybeInitBuilder();
+ builder.clearStopRequests();
+ if (stopRequests == null) {
+ return;
+ }
+ List protoList =
+ new ArrayList(stopRequests.size());
+ for (StopContainerRequest requestProto : stopRequests) {
+ protoList.add(convertToProtoFormat(requestProto));
+ }
+ builder.addAllStopRequests(protoList);
+ }
+
+ private void initLocalStopRequests() {
+ if (stopRequests != null) {
+ return;
+ }
+ StopContainersRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List requestProtos = p.getStopRequestsList();
+ stopRequests = new ArrayList(requestProtos.size());
+ for (StopContainerRequestProto r : requestProtos) {
+ stopRequests.add(convertFromProtoFormat(r));
+ }
+ }
+
@Override
public List getContainerIds() {
initLocalContainerIds();
@@ -137,6 +169,21 @@ public void setContainerIds(List containerIds) {
this.containerIds = containerIds;
}
+ @Override
+ public List getStopRequests() {
+ initLocalStopRequests();
+ return stopRequests;
+ }
+
+ @Override
+ public void setStopRequests(List stopRequests) {
+ maybeInitBuilder();
+ if (stopRequests == null) {
+ builder.clearStopRequests();
+ }
+ this.stopRequests = stopRequests;
+ }
+
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
@@ -144,4 +191,14 @@ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl) t).getProto();
}
+
+ private StopContainerRequestPBImpl convertFromProtoFormat(
+ StopContainerRequestProto p) {
+ return new StopContainerRequestPBImpl(p);
+ }
+
+ private StopContainerRequestProto convertToProtoFormat(
+ StopContainerRequest t) {
+ return ((StopContainerRequestPBImpl) t).getProto();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index dd3deb3..000a9b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -700,7 +701,19 @@ public StopContainersResponse stopContainers(StopContainersRequest requests)
NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
for (ContainerId id : requests.getContainerIds()) {
try {
- stopContainerInternal(identifier, id);
+ stopContainerInternal(identifier, id, false);
+ succeededRequests.add(id);
+ } catch (YarnException e) {
+ failedRequests.put(id, SerializedException.newInstance(e));
+ }
+ }
+
+ // process composite rpc
+ for (StopContainerRequest r : requests.getStopRequests()) {
+ final ContainerId id = r.getContainerId();
+ final boolean dumpThreads = r.getDumpThreads();
+ try {
+ stopContainerInternal(identifier, id, dumpThreads);
succeededRequests.add(id);
} catch (YarnException e) {
failedRequests.put(id, SerializedException.newInstance(e));
@@ -712,12 +725,12 @@ public StopContainersResponse stopContainers(StopContainersRequest requests)
@SuppressWarnings("unchecked")
private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
- ContainerId containerID) throws YarnException {
+ ContainerId containerID, boolean dumpThreads) throws YarnException {
String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID);
LOG.info("Stopping container with container Id: " + containerIDStr);
authorizeGetAndStopContainerRequest(containerID, container, true,
- nmTokenIdentifier);
+ nmTokenIdentifier);
if (container == null) {
if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
@@ -725,9 +738,8 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+ " is not handled by this NodeManager");
}
} else {
- dispatcher.getEventHandler().handle(
- new ContainerKillEvent(containerID,
- "Container killed by the ApplicationMaster."));
+ dispatcher.getEventHandler().handle(new ContainerKillEvent(containerID,
+ "Container killed by the ApplicationMaster.", dumpThreads));
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 862e3fa..d364b6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -127,6 +127,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
new ContainerDiagnosticsUpdateTransition();
+ private static final KillTransition KILL_TRANSITION = new KillTransition();
+
// State Machine for each container.
private static StateMachineFactory
@@ -191,7 +193,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
- ContainerEventType.KILL_CONTAINER, new KillTransition())
+ ContainerEventType.KILL_CONTAINER, KILL_TRANSITION)
// From RUNNING State
.addTransition(ContainerState.RUNNING,
@@ -206,7 +208,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING,
- ContainerEventType.KILL_CONTAINER, new KillTransition())
+ ContainerEventType.KILL_CONTAINER, KILL_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
@@ -796,11 +798,12 @@ public void transition(ContainerImpl container, ContainerEvent event) {
SingleArcTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerKillEvent killEvent = (ContainerKillEvent) event;
// Kill the process/process-grp
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
- ContainersLauncherEventType.CLEANUP_CONTAINER));
- ContainerKillEvent killEvent = (ContainerKillEvent) event;
+ ContainersLauncherEventType.CLEANUP_CONTAINER,
+ killEvent.getDumpThreads()));
container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java
index 313b6a8..26fb78e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java
@@ -23,13 +23,24 @@
public class ContainerKillEvent extends ContainerEvent {
private final String diagnostic;
+ private final boolean dumpThreads;
public ContainerKillEvent(ContainerId cID, String diagnostic) {
+ this(cID, diagnostic, false);
+ }
+
+ public ContainerKillEvent(ContainerId cID, String diagnostic,
+ boolean dumpThreads) {
super(cID, ContainerEventType.KILL_CONTAINER);
this.diagnostic = diagnostic;
+ this.dumpThreads = dumpThreads;
}
public String getDiagnostic() {
return this.diagnostic;
}
+
+ public boolean getDumpThreads() {
+ return dumpThreads;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 8b08965..9dbc267 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -335,7 +335,7 @@ public Integer call() {
* @throws IOException
*/
@SuppressWarnings("unchecked") // dispatcher not typed
- public void cleanupContainer() throws IOException {
+ public void cleanupContainer(boolean dumpThreads) throws IOException {
ContainerId containerId = container.getContainerId();
String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr);
@@ -372,6 +372,15 @@ public void cleanupContainer() throws IOException {
// kill process
if (processId != null) {
String user = container.getUser();
+ boolean result;
+ if (dumpThreads) {
+ result = exec.signalContainer(user, processId, Signal.QUIT);
+ if (!result && LOG.isDebugEnabled()) {
+ LOG.debug("Failed to send " + Signal.QUIT + " to " + processId
+ + " to generate JVM thread dump");
+ }
+ }
+
LOG.debug("Sending signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr);
@@ -380,7 +389,7 @@ public void cleanupContainer() throws IOException {
? Signal.TERM
: Signal.KILL;
- boolean result = exec.signalContainer(user, processId, signal);
+ result = exec.signalContainer(user, processId, signal);
LOG.debug("Sent signal " + signal + " to pid " + processId
+ " as user " + user
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index ce865e3..6ddbf49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -24,7 +24,6 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,21 +31,16 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import com.google.common.annotations.VisibleForTesting;
@@ -135,7 +129,7 @@ public void handle(ContainersLauncherEvent event) {
// Cleanup a container whether it is running/killed/completed, so that
// no sub-processes are alive.
try {
- launcher.cleanupContainer();
+ launcher.cleanupContainer(event.getDumpThreads());
} catch (IOException e) {
LOG.warn("Got exception while cleaning container " + containerId
+ ". Ignoring.");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java
index 38bedf2..bbf0d1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java
@@ -19,22 +19,33 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
public class ContainersLauncherEvent
extends AbstractEvent{
private final Container container;
+ private final boolean dumpThreads;
public ContainersLauncherEvent(Container container,
ContainersLauncherEventType eventType) {
+ this(container, eventType, false);
+ }
+
+ public ContainersLauncherEvent(Container container,
+ ContainersLauncherEventType eventType,
+ boolean dumpThreads) {
super(eventType);
this.container = container;
+ this.dumpThreads = dumpThreads;
}
public Container getContainer() {
return container;
}
+ public boolean getDumpThreads() {
+ return dumpThreads;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
index 81cf797..45a3c90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
@@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.spy;
import java.io.BufferedReader;
import java.io.File;
@@ -52,6 +51,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -69,11 +69,11 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
@@ -625,49 +625,7 @@ private void internalKillTest(boolean delayed) throws Exception {
writer.println("while true; do\nsleep 1s;\ndone");
}
writer.close();
- FileUtil.setExecutable(scriptFile, true);
-
- ContainerLaunchContext containerLaunchContext =
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
- // upload the script file so that the container can run it
- URL resource_alpha =
- ConverterUtils.getYarnUrlFromPath(localFS
- .makeQualified(new Path(scriptFile.getAbsolutePath())));
- LocalResource rsrc_alpha =
- recordFactory.newRecordInstance(LocalResource.class);
- rsrc_alpha.setResource(resource_alpha);
- rsrc_alpha.setSize(-1);
- rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
- rsrc_alpha.setType(LocalResourceType.FILE);
- rsrc_alpha.setTimestamp(scriptFile.lastModified());
- String destinationFile = "dest_file.sh";
- Map localResources =
- new HashMap();
- localResources.put(destinationFile, rsrc_alpha);
- containerLaunchContext.setLocalResources(localResources);
-
- // set up the rest of the container
- List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
- containerLaunchContext.setCommands(commands);
- Token containerToken = createContainerToken(cId);
-
- StartContainerRequest scRequest =
- StartContainerRequest.newInstance(containerLaunchContext,
- containerToken);
- List list = new ArrayList();
- list.add(scRequest);
- StartContainersRequest allRequests =
- StartContainersRequest.newInstance(list);
- containerManager.startContainers(allRequests);
-
- int timeoutSecs = 0;
- while (!processStartFile.exists() && timeoutSecs++ < 20) {
- Thread.sleep(1000);
- LOG.info("Waiting for process start-file to be created");
- }
- Assert.assertTrue("ProcessStartFile doesn't exist!",
- processStartFile.exists());
+ startKillTestContainer(cId, processStartFile, scriptFile);
// Now test the stop functionality.
List containerIds = new ArrayList();
@@ -718,6 +676,51 @@ private void internalKillTest(boolean delayed) throws Exception {
}
}
+ private void startKillTestContainer(ContainerId cId, File processStartFile, File scriptFile) throws YarnException, IOException, InterruptedException {
+ FileUtil.setExecutable(scriptFile, true);
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // upload the script file so that the container can run it
+ URL resource_alpha = ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file.sh";
+ Map localResources =
+ new HashMap();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+
+ // set up the rest of the container
+ List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+ containerLaunchContext.setCommands(commands);
+ Token containerToken = createContainerToken(cId);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ List list = new ArrayList();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
+ int timeoutSecs = 0;
+ while (!processStartFile.exists() && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for process start-file to be created");
+ }
+ Assert.assertTrue("ProcessStartFile doesn't exist!",
+ processStartFile.exists());
+ }
+
@Test
public void testDelayedKill() throws Exception {
internalKillTest(true);
@@ -767,4 +770,82 @@ protected Token createContainerToken(ContainerId cId) throws InvalidToken {
return containerToken;
}
+ public static class SleepTask {
+ public static void main(String[] args) throws Throwable {
+ if (args.length != 1) {
+ System.exit(1);
+ }
+ final int sleepTime = Integer.valueOf(args[0]);
+ System.out.printf("Sleeping for %d milliseconds\n", sleepTime);
+ Thread.sleep(sleepTime);
+ System.out.println("Exiting after sleeping");
+ }
+ }
+
+
+ @Test
+ public void testThreadDumpKill() throws Exception {
+ containerManager.start();
+
+ File processStartFile = new File(tmpDir, "pid.txt").getAbsoluteFile();
+
+ // setup a script that can handle sigterm gracefully
+ File scriptFile = new File(tmpDir, "testscript.sh");
+ PrintWriter writer = new PrintWriter(
+ new FileOutputStream(scriptFile));
+ writer.println("#!/bin/bash\n\n");
+ writer.println("exec $JAVA_HOME/bin/java -classpath "
+ + System.getProperty("java.class.path") + " \'"
+ + SleepTask.class.getName() + "\' 600000 >> " + processStartFile);
+ writer.close();
+ FileUtil.setExecutable(scriptFile, true);
+
+ // ////// Construct the Container-id
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.
+ newInstance(appId, 1);
+ ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
+
+ startKillTestContainer(cId, processStartFile, scriptFile);
+
+ // Now test the stop functionality.
+ List containerIds = new ArrayList();
+ containerIds.add(cId);
+ containerManager.stopContainers(StopContainersRequest.newInstance1(
+ Collections.singletonList(
+ StopContainerRequest.newInstance(cId, true))));
+
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
+ ContainerState.COMPLETE);
+
+ // if delayed container stop sends a sigterm followed by a sigkill
+ // otherwise sigkill is sent immediately
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+
+ ContainerStatus containerStatus =
+ containerManager.getContainerStatuses(gcsRequest)
+ .getContainerStatuses().get(0);
+ Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
+ containerStatus.getExitStatus());
+ Assert.assertFalse("Process is still alive!",
+ DefaultContainerExecutor.containerIsAlive(cId.toString()));
+
+ BufferedReader reader =
+ new BufferedReader(new FileReader(processStartFile));
+
+ boolean found = false;
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line.contains("Full thread dump")) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue("Did not find \"Full thread dump\"", found);
+ reader.close();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
index c9e57a6..7989aee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -218,59 +219,67 @@ synchronized public void checkResourceUsage() {
resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getUsedResource().getMemory());
}
-
+
@Override
- synchronized public StopContainersResponse stopContainers(StopContainersRequest request)
+ synchronized public StopContainersResponse stopContainers(StopContainersRequest request)
throws YarnException {
for (ContainerId containerID : request.getContainerIds()) {
- String applicationId =
- String.valueOf(containerID.getApplicationAttemptId()
- .getApplicationId().getId());
- // Mark the container as COMPLETE
- List applicationContainers = containers.get(containerID.getApplicationAttemptId()
- .getApplicationId());
- for (Container c : applicationContainers) {
- if (c.getId().compareTo(containerID) == 0) {
- ContainerStatus containerStatus = containerStatusMap.get(c);
- containerStatus.setState(ContainerState.COMPLETE);
- containerStatusMap.put(c, containerStatus);
- }
- }
+ stopContainerInternal(containerID);
+ }
+ // process composite rpc, dumpThreads ignored
+ for (StopContainerRequest r : request.getStopRequests()) {
+ stopContainerInternal(r.getContainerId());
+ }
+ return StopContainersResponse.newInstance(null,null);
+ }
- // Send a heartbeat
- try {
- heartbeat();
- } catch (IOException ioe) {
- throw RPCUtil.getRemoteException(ioe);
+ private void stopContainerInternal(ContainerId containerID) throws YarnException {
+ String applicationId =
+ String.valueOf(containerID.getApplicationAttemptId()
+ .getApplicationId().getId());
+ // Mark the container as COMPLETE
+ List applicationContainers = containers.get(containerID.getApplicationAttemptId()
+ .getApplicationId());
+ for (Container c : applicationContainers) {
+ if (c.getId().compareTo(containerID) == 0) {
+ ContainerStatus containerStatus = containerStatusMap.get(c);
+ containerStatus.setState(ContainerState.COMPLETE);
+ containerStatusMap.put(c, containerStatus);
}
+ }
- // Remove container and update status
- int ctr = 0;
- Container container = null;
- for (Iterator i = applicationContainers.iterator(); i
- .hasNext();) {
- container = i.next();
- if (container.getId().compareTo(containerID) == 0) {
- i.remove();
- ++ctr;
- }
- }
+ // Send a heartbeat
+ try {
+ heartbeat();
+ } catch (IOException ioe) {
+ throw RPCUtil.getRemoteException(ioe);
+ }
- if (ctr != 1) {
- throw new IllegalStateException("Container " + containerID
- + " stopped " + ctr + " times!");
+ // Remove container and update status
+ int ctr = 0;
+ Container container = null;
+ for (Iterator i = applicationContainers.iterator(); i
+ .hasNext();) {
+ container = i.next();
+ if (container.getId().compareTo(containerID) == 0) {
+ i.remove();
+ ++ctr;
}
+ }
- Resources.addTo(available, container.getResource());
- Resources.subtractFrom(used, container.getResource());
+ if (ctr != 1) {
+ throw new IllegalStateException("Container " + containerID
+ + " stopped " + ctr + " times!");
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("stopContainer:" + " node=" + containerManagerAddress
- + " application=" + applicationId + " container=" + containerID
- + " available=" + available + " used=" + used);
- }
+ Resources.addTo(available, container.getResource());
+ Resources.subtractFrom(used, container.getResource());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("stopContainer:" + " node=" + containerManagerAddress
+ + " application=" + applicationId + " container=" + containerID
+ + " available=" + available + " used=" + used);
}
- return StopContainersResponse.newInstance(null,null);
}
@Override