]%n%n");
ToolRunner.printGenericCommandUsage(System.out);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
index d0ce704..54e862a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
@@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ClientToken;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -501,4 +502,33 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
throw new IOException("Cannot get log path for a in-progress job");
}
}
+
+ // This is only used to get the container id of a running taskattempt
+ // If the application has finished, we don't need to ask HS
+ public ContainerId getContainerId(TaskAttemptID taskAttemptID)
+ throws YarnRemoteException, IOException {
+ ApplicationReport application = rm.getApplicationReport(appId);
+ if (application == null ||
+ YarnApplicationState.RUNNING != application.getYarnApplicationState()) {
+ throw new IOException("application isn't running: " + appId);
+ }
+
+ if (taskAttemptID != null) {
+ GetTaskAttemptReportRequest taRequest =
+ recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+ taRequest.setTaskAttemptId(TypeConverter.toYarn(taskAttemptID));
+ TaskAttemptReport taReport =
+ ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
+ GetTaskAttemptReportRequest.class, taRequest))
+ .getTaskAttemptReport();
+ if (taReport.getContainerId() == null
+ || taReport.getNodeManagerHost() == null) {
+ throw new IOException("Unable to get log information for task: "
+ + taskAttemptID);
+ }
+ return taReport.getContainerId();
+ }
+ return null;
+ }
+
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 9bb08fb..079b135 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -72,6 +72,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.api.records.DelegationToken;
@@ -642,6 +643,12 @@ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
}
+ @Override
+ public ContainerId getContainerId(JobID jobID, TaskAttemptID taskAttemptID)
+ throws IOException, InterruptedException {
+ return clientCache.getClient(jobID).getContainerId(taskAttemptID);
+ }
+
private static void warnForJavaLibPath(String opts, String component,
String javaConf, String envConf) {
if (opts != null && opts.contains("-Djava.library.path")) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
index 503d188..ce3d0d7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
@@ -91,6 +91,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -355,6 +357,13 @@ public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
return null;
}
+
+ @Override
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws YarnRemoteException {
+ return null;
+ }
+
}
class HistoryService extends AMService implements HSClientProtocol {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
index 8a19dad..15bceb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
@@ -43,6 +43,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -291,4 +293,29 @@ public RenewDelegationTokenResponse renewDelegationToken(
@Private
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException;
+
+ /**
+ * The interface used by clients to request the
+ * ResourceManager to signal a container. For example,
+ * the client can send signal 3 to dump threads of the container.
+ *
+ * The client, via {@link SignalContainerRequest} provides the
+ * id of the container and the signal number.
+ *
+ * In secure mode,the ResourceManager verifies access to the
+ * application before signaling the container.
+ * The user needs to have MODIFY_APP permission.
+ *
+ * Currently, the ResourceManager returns an empty response
+ * on success and throws an exception on rejecting the request.
+ *
+ * @param request request to signal a container
+ * @return ResourceManager returns an empty response
+ * on success and throws an exception on rejecting the request
+ * @throws YarnRemoteException
+ * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)
+ */
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws YarnRemoteException;
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java
new file mode 100644
index 0000000..9e1d41f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * The request sent by the client to the ResourceManager
+ * to signal a container.
+ *
+ * The request includes the {@link ContainerId} of the container to be
+ * signal.
+ *
+ * @see ClientRMProtocol#signalContainer(SignalContainerRequest)
+ */
+@Public
+@Stable
+public interface SignalContainerRequest {
+ /**
+ * Get the ContainerId of the container to signal.
+ * @return ContainerId of the container to signal.
+ */
+ @Public
+ @Stable
+ public abstract ContainerId getContainerId();
+
+ @Public
+ @Stable
+ public abstract void setContainerId(ContainerId containerId);
+
+ @Public
+ @Stable
+ public abstract int getSignal();
+
+ @Public
+ @Stable
+ public abstract void setSignal(int signal);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java
new file mode 100644
index 0000000..1c95c0e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+
+/**
+ * The response sent by the ResourceManager to the client
+ * signalling a container.
+ *
+ * Currently it's empty.
+ *
+ * @see ClientRMProtocol#signalContainer(SignalContainerRequest)
+ */
+@Public
+@Stable
+public interface SignalContainerResponse {
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java
new file mode 100644
index 0000000..0e791ae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+
+
+public class SignalContainerRequestPBImpl
+ extends ProtoBase implements SignalContainerRequest {
+ SignalContainerRequestProto proto = SignalContainerRequestProto.getDefaultInstance();
+ SignalContainerRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ContainerId containerId;
+
+ public SignalContainerRequestPBImpl() {
+ builder = SignalContainerRequestProto.newBuilder();
+ }
+
+ public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainerRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containerId != null) {
+ builder.setContainerId(convertToProtoFormat(this.containerId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SignalContainerRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+
+ @Override
+ public ContainerId getContainerId() {
+ SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.containerId != null) {
+ return this.containerId;
+ }
+ if (!p.hasContainerId()) {
+ return null;
+ }
+ this.containerId = convertFromProtoFormat(p.getContainerId());
+ return this.containerId;
+ }
+
+ @Override
+ public void setContainerId(ContainerId containerId) {
+ maybeInitBuilder();
+ if (containerId == null)
+ builder.clearContainerId();
+ this.containerId = containerId;
+ }
+
+ @Override
+ public int getSignal() {
+ SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getSignal();
+
+ }
+
+ @Override
+ public void setSignal(int signal) {
+ maybeInitBuilder();
+ builder.setSignal(signal);
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl)t).getProto();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java
new file mode 100644
index 0000000..bcf5a03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
+
+
+
+public class SignalContainerResponsePBImpl
+ extends ProtoBase implements SignalContainerResponse {
+ SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance();
+ SignalContainerResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SignalContainerResponsePBImpl() {
+ builder = SignalContainerResponseProto.newBuilder();
+ }
+
+ public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainerResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SignalContainerResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto
index 5aa2380..1a9cdf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto
@@ -37,5 +37,6 @@ service ClientRMProtocolService {
rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto);
rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto);
rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto);
+ rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index b073299..2f346d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -141,6 +141,14 @@ message GetQueueUserAclsInfoResponseProto {
repeated QueueUserACLInfoProto queueUserAcls = 1;
}
+message SignalContainerRequestProto {
+ optional ContainerIdProto container_id = 1;
+ optional int32 signal = 2;
+}
+
+message SignalContainerResponseProto {
+}
+
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
index edc35a9..dc135fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -236,4 +237,19 @@ ApplicationReport getApplicationReport(ApplicationId appId)
* @throws YarnRemoteException
*/
List getQueueAclsInfo() throws YarnRemoteException;
+
+ /**
+ *
+ * Signal a container identified by given ID.
+ *
+ *
+ * @param containerId
+ * {@link ContainerId} of the container that needs to be signaled
+ * @param signal the signal number
+ * @throws YarnRemoteException
+ * in case of errors or if YARN rejects the request due to
+ * access-control restrictions.
+ */
+ void signalContainer(ContainerId containerId, int signal) throws YarnRemoteException;
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
index 5ee3cd7..27e0584 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
@@ -45,10 +45,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -278,4 +280,15 @@ private void getChildQueues(QueueInfo parent, List queues,
}
}
}
+
+ @Override
+ public void signalContainer(ContainerId containerId, int signal)
+ throws YarnRemoteException {
+ LOG.info("Signalling container " + containerId);
+ SignalContainerRequest request =
+ Records.newRecord(SignalContainerRequest.class);
+ request.setContainerId(containerId);
+ request.setSignal(signal);
+ rmClient.signalContainer(request);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
index 7156b43..997a888 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -57,6 +58,11 @@ public int run(String[] args) throws Exception {
killOption.setArgs(2);
killOption.setArgName("Application ID [reason]");
opts.addOption(killOption);
+ Option signalOption = new Option(SIGNAL_CMD, true,
+ "Signal the container. Default signal number is 3.");
+ signalOption.setArgs(2);
+ signalOption.setArgName("container ID [signal number]");
+ opts.addOption(signalOption);
CommandLine cliParser = new GnuParser().parse(opts, args);
@@ -76,6 +82,18 @@ public int run(String[] args) throws Exception {
}
final String[] killArgs = cliParser.getOptionValues(KILL_CMD);
killApplication(killArgs[0], killArgs.length == 2 ? killArgs[1] : null);
+ } else if (cliParser.hasOption(SIGNAL_CMD)) {
+ if (args.length < 2 || args.length > 3) {
+ printUsage(opts);
+ return exitCode;
+ }
+ final String[] signalArgs = cliParser.getOptionValues(SIGNAL_CMD);
+ final String containerId = signalArgs[0];
+ int signal = 3;
+ if (signalArgs.length == 2) {
+ signal = Integer.parseInt(signalArgs[1]);
+ }
+ signalContainer(containerId, signal);
} else {
syserr.println("Invalid Command Usage : ");
printUsage(opts);
@@ -129,6 +147,20 @@ private void killApplication(String applicationId, String reason)
}
/**
+ * Signals the containerId
+ *
+ * @param containerIdStr the container id
+ * @param signal the signal number
+ * @throws YarnRemoteException
+ */
+ private void signalContainer(String containerIdStr, int signal)
+ throws YarnRemoteException {
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ sysout.println("Signalling container " + containerIdStr);
+ client.signalContainer(containerId, signal);
+ }
+
+ /**
* Prints the application report for an application id.
*
* @param applicationId
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
index a36e671..ee18fee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
@@ -30,6 +30,7 @@
public static final String STATUS_CMD = "status";
public static final String LIST_CMD = "list";
public static final String KILL_CMD = "kill";
+ public static final String SIGNAL_CMD = "signal";
protected PrintStream sysout;
protected PrintStream syserr;
protected YarnClient client;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
index 25212b8..9032684 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java
@@ -54,6 +54,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
@@ -76,6 +78,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -88,6 +92,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import com.google.protobuf.ServiceException;
@@ -268,4 +273,17 @@ public CancelDelegationTokenResponse cancelDelegationToken(
throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
}
}
+
+ @Override
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws YarnRemoteException {
+ SignalContainerRequestProto requestProto =
+ ((SignalContainerRequestPBImpl) request).getProto();
+ try {
+ return new SignalContainerResponsePBImpl(
+ proxy.signalContainer(null, requestProto));
+ } catch (ServiceException e) {
+ throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
index 1c2d5b0..5a74bc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
@@ -60,6 +61,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -79,6 +82,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@@ -250,4 +255,17 @@ public CancelDelegationTokenResponseProto cancelDelegationToken(
throw new ServiceException(e);
}
}
+
+ @Override
+ public SignalContainerResponseProto signalContainer(RpcController arg0,
+ SignalContainerRequestProto proto) throws ServiceException {
+ SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto);
+ try {
+ SignalContainerResponse response = real.signalContainer(request);
+ return ((SignalContainerResponsePBImpl)response).getProto();
+ } catch (YarnRemoteException e) {
+ throw new ServiceException(e);
+ }
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
index 4536934..17e154d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
@@ -19,6 +19,7 @@
import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -33,7 +34,7 @@
List getApplicationsToCleanupList();
ApplicationId getApplicationsToCleanup(int index);
int getApplicationsToCleanupCount();
-
+
void setResponseId(int responseId);
void setNodeAction(NodeAction action);
@@ -49,4 +50,7 @@
void addApplicationToCleanup(ApplicationId applicationId);
void removeApplicationToCleanup(int index);
void clearApplicationsToCleanup();
+
+ List getContainersToSignalList();
+ void addAllContainersToSignal(List containers);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java
index 8a7d890..71300cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java
@@ -23,6 +23,8 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
@@ -34,6 +36,7 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -46,6 +49,7 @@
private List containersToCleanup = null;
private List applicationsToCleanup = null;
+ private List containersToSignal = null;
private MasterKey masterKey = null;
public HeartbeatResponsePBImpl() {
@@ -75,6 +79,9 @@ private void mergeLocalToBuilder() {
if (this.masterKey != null) {
builder.setMasterKey(convertToProtoFormat(this.masterKey));
}
+ if (this.containersToSignal != null) {
+ addContainersToSignalToProto();
+ }
}
private void mergeLocalToProto() {
@@ -324,6 +331,76 @@ private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto();
}
+ @Override
+ public List getContainersToSignalList() {
+ initContainersToSignal();
+ return this.containersToSignal;
+ }
+
+ private void initContainersToSignal() {
+ if (this.containersToSignal != null) {
+ return;
+ }
+ HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getContainersToSignalList();
+ this.containersToSignal = new ArrayList();
+
+ for (SignalContainerRequestProto c : list) {
+ this.containersToSignal.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void addAllContainersToSignal(
+ final List containersToSignal) {
+ if (containersToSignal == null)
+ return;
+ initContainersToSignal();
+ this.containersToSignal.addAll(containersToSignal);
+ }
+
+ private void addContainersToSignalToProto() {
+ maybeInitBuilder();
+ builder.clearContainersToSignal();
+ if (containersToSignal == null)
+ return;
+
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = containersToSignal.iterator();
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public SignalContainerRequestProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllContainersToSignal(iterable);
+ }
+
+ private SignalContainerRequestPBImpl convertFromProtoFormat(
+ SignalContainerRequestProto p) {
+ return new SignalContainerRequestPBImpl(p);
+ }
+
+ private SignalContainerRequestProto convertToProtoFormat(
+ SignalContainerRequest t) {
+ return ((SignalContainerRequestPBImpl)t).getProto();
+ }
+
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
@@ -331,7 +408,7 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
-
+
private NodeAction convertFromProtoFormat(NodeActionProto p) {
return NodeAction.valueOf(p.name());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 71f5b1b..53019ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -22,6 +22,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "yarn_protos.proto";
+import "yarn_service_protos.proto";
enum NodeActionProto {
NORMAL = 0;
@@ -53,5 +54,6 @@ message HeartbeatResponseProto {
optional NodeActionProto nodeAction = 3;
repeated ContainerIdProto containers_to_cleanup = 4;
repeated ApplicationIdProto applications_to_cleanup = 5;
+ repeated SignalContainerRequestProto containers_to_signal = 6;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java
new file mode 100644
index 0000000..cc967b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+
+public class CMgrSignalContainersEvent extends ContainerManagerEvent {
+
+ private List containerToSignal;
+
+ public CMgrSignalContainersEvent(List containerToSignal) {
+ super(ContainerManagerEventType.SIGNAL_CONTAINERS);
+ this.containerToSignal = containerToSignal;
+ }
+
+ public List getContainersToSignal() {
+ return this.containerToSignal;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 9cffde1..36303d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -158,6 +158,18 @@ public int getValue() {
public String toString() {
return str;
}
+
+ public static Signal signalOf(int value) {
+ if (value == QUIT.value) {
+ return QUIT;
+ } else if (value == KILL.value) {
+ return KILL;
+ } else if (value == TERM.value) {
+ return TERM;
+ } else {
+ return NULL;
+ }
+ }
}
protected void logOutput(String output) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
index 4278ce0..8a7e423 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
@@ -21,4 +21,5 @@
public enum ContainerManagerEventType {
FINISH_APPS,
FINISH_CONTAINERS,
+ SIGNAL_CONTAINERS,
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 841e169..7499e7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -374,7 +375,7 @@ public void run() {
.getContainersToCleanupList();
if (containersToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
- new CMgrCompletedContainersEvent(containersToCleanup,
+ new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
}
List appsToCleanup =
@@ -385,6 +386,16 @@ public void run() {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
+
+ // SignalContainer request originally comes from end users via
+ // ClientRMProtocol's SignalContainer. This request will be forwarded
+ // ContainerManager which later will dispatch the event to ContainerLauncher
+ List containersToSignal = response
+ .getContainersToSignalList();
+ if (containersToSignal.size() != 0) {
+ dispatcher.getEventHandler().handle(
+ new CMgrSignalContainersEvent(containersToSignal));
+ }
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index fafbf0e..48c2ef7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
@@ -64,6 +65,7 @@
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -72,6 +74,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -86,6 +89,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
@@ -608,6 +612,24 @@ public void handle(ContainerManagerEvent event) {
new ContainerKillEvent(container, diagnostic));
}
break;
+ case SIGNAL_CONTAINERS:
+ CMgrSignalContainersEvent containersSignalEvent =
+ (CMgrSignalContainersEvent) event;
+ for (SignalContainerRequest request : containersSignalEvent
+ .getContainersToSignal()) {
+ ContainerId containerId = request.getContainerId();
+ Container container = this.context.getContainers().get(containerId);
+ if (container != null) {
+ LOG.info("Container " + containerId + " signal request by ResourceManager");
+ this.dispatcher.getEventHandler().handle(
+ new SignalContainersLauncherEvent(container,
+ Signal.signalOf(request.getSignal())));
+ } else {
+ LOG.info("Container " + containerId + " no longer exists");
+ }
+ }
+ break;
+
default:
LOG.warn("Invalid event " + event.getType() + ". Ignoring.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index acfd2a7..300076f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
public interface Container extends EventHandler {
@@ -46,4 +47,6 @@
String toString();
String localizationCountersAsString();
+
+ void Signal(Signal signal);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index e9ff5ec..5e63da0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -56,6 +57,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
@@ -952,4 +954,10 @@ public String localizationCountersAsString() {
return result.toString();
}
+ @Override
+ public void Signal(Signal signal) {
+ dispatcher.getEventHandler().handle(
+ new SignalContainersLauncherEvent(this, signal));
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index edbb44d..ba41f3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -378,6 +378,56 @@ public void cleanupContainer(boolean dumpThreads) throws IOException {
}
/**
+ * Send a signal to the container.
+ *
+ * Acceptable signals: {@link Signal#QUIT}
+ *
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void signalContainer(Signal signal) throws IOException {
+ ContainerId containerId = container.getContainerID();
+ String containerIdStr = ConverterUtils.toString(containerId);
+ String user = container.getUser();
+
+ LOG.info("Sending signal " + signal + " to container " + containerIdStr);
+
+ boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
+ if (!alreadyLaunched) {
+ LOG.info("Container " + containerIdStr + " not launched."
+ + " Not sending the signal");
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting pid for container " + containerIdStr
+ + " to send signal to from pid file "
+ + (pidFilePath != null ? pidFilePath.toString() : "null"));
+ }
+
+ try {
+ // get process id from pid file if available
+ // else if shell is still active, get it from the shell
+ String processId = null;
+ if (pidFilePath != null) {
+ processId = getContainerPid(pidFilePath);
+ }
+
+ if (processId != null) {
+ LOG.info("Sending signal " + signal + " to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr);
+ exec.signalContainer(user, processId, signal);
+ }
+ } catch (Exception e) {
+ String message =
+ "Exception when sending signal to container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.warn(message);
+ }
+ }
+
+ /**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 1f3fd21..1f06b71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -147,6 +147,23 @@ public void handle(ContainersLauncherEvent event) {
+ ". Ignoring.");
}
break;
+ case SIGNAL_CONTAINER:
+ SignalContainersLauncherEvent signalEvent =
+ (SignalContainersLauncherEvent) event;
+ RunningContainer runningContainer = running.get(containerId);
+ if (runningContainer == null) {
+ // Container not launched. So nothing needs to be done.
+ LOG.info("Container " + containerId + " not running, nothing to signal.");
+ return;
+ }
+
+ try {
+ runningContainer.launcher.signalContainer(signalEvent.getSignal());
+ } catch (IOException e) {
+ LOG.warn("Got exception while cleaning container " + containerId
+ + ". Ignoring.");
+ }
+ break;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 6793bf7..92e1fcc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -21,4 +21,5 @@
public enum ContainersLauncherEventType {
LAUNCH_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
+ SIGNAL_CONTAINER,
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
new file mode 100644
index 0000000..d5ed469
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+// This event can be triggered by one of the following flows
+// WebUI -> Container
+// CLI -> ClientRMProtocol -> NM HeartbeatResponse -> ContainerManager
+public class SignalContainersLauncherEvent extends ContainersLauncherEvent{
+
+ private final Signal signal;
+ public SignalContainersLauncherEvent(Container container, Signal signal) {
+ super(container, ContainersLauncherEventType.SIGNAL_CONTAINER);
+ this.signal = signal;
+ }
+ public Signal getSignal() {
+ return signal;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java
index 1bbb945..8eefb43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java
@@ -52,7 +52,7 @@
private String containersTableInit() {
return tableInit().
// containerid, containerid, log-url
- append(", aoColumns:[null, null, {bSearchable:false}]} ").toString();
+ append(", aoColumns:[null, null, {bSearchable:false}, null]} ").toString();
}
@Override
@@ -79,6 +79,7 @@ protected void render(Block html) {
.td()._("ContainerId")._()
.td()._("ContainerState")._()
.td()._("logs")._()
+ .td()._("DumpThreads")._()
._()
._().tbody();
for (Entry entry : this.nmContext
@@ -91,6 +92,8 @@ protected void render(Block html) {
.td()._(info.getState())._()
.td()
.a(url(info.getShortLogLink()), "logs")._()
+ .td()
+ .a(url(info.getShortDumpThreadsLink()), "Click to dump threads")._()
._();
}
tableBody._()._()._();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerDumpThreadsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerDumpThreadsPage.java
new file mode 100644
index 0000000..d266fa3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerDumpThreadsPage.java
@@ -0,0 +1,136 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class ContainerDumpThreadsPage extends NMView implements YarnWebParams {
+
+ @Override
+ protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+
+ setTitle("Threads Dump for " + $(CONTAINER_ID));
+ set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
+ }
+
+ @Override
+ protected Class extends SubView> content() {
+ return ContainerBlock.class;
+ }
+
+ public static class ContainerBlock extends HtmlBlock implements YarnWebParams {
+
+ private final Context nmContext;
+ private final ApplicationACLsManager aclsManager;
+
+ @Inject
+ public ContainerBlock(Context nmContext, ApplicationACLsManager aclsManager) {
+ this.nmContext = nmContext;
+ this.aclsManager = aclsManager;
+ }
+
+ @Override
+ protected void render(Block html) {
+ ContainerId containerID;
+ try {
+ containerID = ConverterUtils.toContainerId($(CONTAINER_ID));
+ } catch (IllegalArgumentException e) {
+ html.p()._("Invalid containerId " + $(CONTAINER_ID))._();
+ return;
+ }
+
+ DIV div = html.div("#content");
+
+ ApplicationId applicationId = containerID.getApplicationAttemptId()
+ .getApplicationId();
+ Application application = this.nmContext.getApplications().get(
+ applicationId);
+ if (application == null) {
+ html.h1(
+ "Unknown container. Container either has not started or "
+ + "has already completed or doesn't belong to this node at all.");
+ return;
+ }
+
+ Container container = this.nmContext.getContainers().get(containerID);
+ if (container == null) {
+ div.h1("Unknown Container. Container might have completed, "
+ + "please go back to the previous page and retry.")._();
+ return;
+ }
+
+ if (container.getContainerState() != ContainerState.RUNNING &&
+ container.getContainerState() != ContainerState.KILLING) {
+ html.h1("Container wasn't started.");
+ return;
+ }
+
+ dumpThreads(html, container, applicationId, application);
+ }
+
+ private void dumpThreads(Block html, Container container,
+ ApplicationId applicationId, Application application) {
+ // Check for the authorization.
+ String remoteUser = request().getRemoteUser();
+ UserGroupInformation callerUGI = null;
+
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ if (callerUGI != null && !this.aclsManager.checkAccess(callerUGI,
+ ApplicationAccessType.MODIFY_APP, application.getUser(),
+ applicationId)) {
+ html.h1(
+ "User [" + remoteUser
+ + "] is not authorized to signal the container "
+ + container.getContainerID());
+ return;
+ }
+
+ container.Signal(ContainerExecutor.Signal.QUIT);
+ ContainerInfo info = new ContainerInfo(this.nmContext, container);
+ String logUrl = info.getShortLogLink();
+ html.p().a(".logslink", url(logUrl, "stdout", "?start=-4096"),
+ "Threads Dump captured successfully. Click here for the result.")._();
+ return;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
index 060d72a..c065710 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
@@ -85,7 +85,8 @@ protected void render(Block html) {
._("Diagnostics", info.getDiagnostics())
._("User", info.getUser())
._("TotalMemoryNeeded", info.getMemoryNeeded())
- ._("logs", info.getShortLogLink(), "Link to logs");
+ ._("logs", info.getShortLogLink(), "Link to logs")
+ ._("Dump Threads", info.getShortDumpThreadsLink(), "Click to dump threads");
html._(InfoBlock.class);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
index 86e2505..c2ed70f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
@@ -103,4 +103,9 @@ public void logs() {
}
render(ContainerLogsPage.class);
}
+
+ public void containerDumpThreads() {
+ render(ContainerDumpThreadsPage.class);
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
index 3128217..0250736 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
@@ -118,6 +118,9 @@ public void setup() {
route(
pajoin("/containerlogs", CONTAINER_ID, APP_OWNER, CONTAINER_LOG_TYPE),
NMController.class, "logs");
+ route(
+ pajoin("/containerdumpthreads", CONTAINER_ID, APP_OWNER), NMController.class,
+ "containerDumpThreads");
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java
index 41c649e..7d5fa79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java
@@ -48,6 +48,8 @@
protected String containerLogsShortLink;
@XmlTransient
protected String exitStatus;
+ @XmlTransient
+ protected String containerDumpThreadsShortLink;
public ContainerInfo() {
} // JAXB needs this
@@ -78,6 +80,8 @@ public ContainerInfo(final Context nmContext, final Container container,
}
this.containerLogsShortLink = ujoin("containerlogs", this.id,
container.getUser());
+ this.containerDumpThreadsShortLink = ujoin("containerdumpthreads", this.id,
+ container.getUser());
if (requestUri == null) {
requestUri = "";
@@ -129,4 +133,8 @@ public long getMemoryNeeded() {
return this.totalMemoryNeededMB;
}
+ public String getShortDumpThreadsLink() {
+ return this.containerDumpThreadsShortLink;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index d65b096..ca72adb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -107,9 +108,15 @@ public void tearDown() {
private class MyResourceTracker implements ResourceTracker {
private final Context context;
+ private boolean signalContainer;
public MyResourceTracker(Context context) {
+ this(context, false);
+ }
+
+ public MyResourceTracker(Context context, boolean signalContainer) {
this.context = context;
+ this.signalContainer = signalContainer;
}
@Override
@@ -167,6 +174,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID++);
Map> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
+ List containersToSignal = null;
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
@@ -194,6 +202,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.context.getContainers();
Assert.assertEquals(1, activeContainers.size());
+ if (this.signalContainer) {
+ containersToSignal = new ArrayList();
+ SignalContainerRequest signalReq = recordFactory
+ .newRecordInstance(SignalContainerRequest.class);
+ signalReq.setContainerId(firstContainerID);
+ signalReq.setSignal(3);
+ containersToSignal.add(signalReq);
+ }
+
// Give another container to the NM.
applicationID.setId(heartBeatID);
appAttemptID.setApplicationId(applicationID);
@@ -221,7 +238,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
-
+ if (containersToSignal != null) {
+ response.addAllContainersToSignal(containersToSignal);
+ }
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
@@ -229,14 +248,41 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
}
+ private class MyContainerManager extends ContainerManagerImpl {
+ public boolean signaled = false;
+
+ public MyContainerManager(Context context, ContainerExecutor exec,
+ DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
+ NodeManagerMetrics metrics, ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
+ super(context, exec, deletionContext, nodeStatusUpdater,
+ metrics, aclsManager, dirsHandler);
+ }
+
+ @Override
+ public void handle(ContainerManagerEvent event) {
+ if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) {
+ signaled = true;
+ }
+ }
+
+ }
+
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
- public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
+ public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ this(context, dispatcher, healthChecker, metrics, false);
+ }
+
+ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+ boolean signalContainer) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
+ this.resourceTracker = new MyResourceTracker(this.context, signalContainer);
}
@Override
@@ -309,7 +355,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction);
-
+
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
@@ -394,8 +440,18 @@ public void testNMRegistration() throws InterruptedException {
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
- metrics);
+ metrics);
+ }
+ @Override
+ protected ContainerManagerImpl createContainerManager(Context context,
+ ContainerExecutor exec, DeletionService del,
+ NodeStatusUpdater nodeStatusUpdater,
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService diskhandler) {
+ return new MyContainerManager(context, exec, del, nodeStatusUpdater,
+ metrics, aclsManager, diskhandler);
}
+
};
YarnConfiguration conf = createNMConfig();
@@ -446,7 +502,68 @@ public void run() {
nm.stop();
}
-
+
+ //Verify that signalContainer request can be dispatched from
+ //NodeStatusUpdaterImpl to ContainerManagerImpl.
+ @Test
+ public void testSignalContainerToContainerManager() throws Exception {
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new MyNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics, true);
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(Context context,
+ ContainerExecutor exec, DeletionService del,
+ NodeStatusUpdater nodeStatusUpdater,
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService diskhandler) {
+ return new MyContainerManager(context, exec, del, nodeStatusUpdater,
+ metrics, aclsManager, diskhandler);
+ }
+
+ };
+
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+
+ System.out.println(" ----- thread already started.."
+ + nm.getServiceState());
+
+ int waitCount = 0;
+ while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to start..");
+ if (nmStartError != null) {
+ LOG.error("Error during startup. ", nmStartError);
+ Assert.fail(nmStartError.getCause().getMessage());
+ }
+ Thread.sleep(1000);
+ }
+ if (nm.getServiceState() != STATE.STARTED) {
+ // NM could have failed.
+ Assert.fail("NodeManager failed to start");
+ }
+
+ waitCount = 0;
+ while (heartBeatID <= 3 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ Assert.assertFalse(heartBeatID <= 3);
+ Assert.assertEquals("Number of registered NMs is wrong!!", 1,
+ this.registeredNodes.size());
+
+ MyContainerManager containerManager =
+ (MyContainerManager)nm.getContainerManager();
+ Assert.assertTrue(containerManager.signaled);
+
+ nm.stop();
+
+ }
+
@Test
public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 5b01cc0..cf8c220 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -35,7 +40,9 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -51,6 +58,8 @@
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -494,4 +503,98 @@ public void testLocalFilesCleanup() throws InterruptedException,
Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
targetFile.exists());
}
+
+ @Test
+ public void testContainerLaunchAndSignal() throws IOException,
+ InterruptedException {
+
+ ContainerExecutor executorSpy = spy(exec);
+ containerManager =
+ new ContainerManagerImpl(context, executorSpy, delSrvc, nodeStatusUpdater,
+ metrics, new ApplicationACLsManager(conf), dirsHandler);
+ containerManager.init(conf);
+ containerManager.start();
+
+ File scriptFile = new File(tmpDir, "scriptFile.sh");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ File processStartFile =
+ new File(tmpDir, "start_file.txt").getAbsoluteFile();
+ fileWriter.write("\numask 0"); // So that start file is readable by the test
+ fileWriter.write("\necho Hello World! > " + processStartFile);
+ fileWriter.write("\necho $$ >> " + processStartFile);
+ fileWriter.write("\nexec sleep 1000s");
+ fileWriter.close();
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // ////// Construct the Container-id
+ ContainerId cId = createContainerId();
+ containerLaunchContext.setContainerId(cId);
+
+ containerLaunchContext.setUser(user);
+
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map localResources =
+ new HashMap();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+ containerLaunchContext.setUser(containerLaunchContext.getUser());
+ List commands = new ArrayList();
+ commands.add("/bin/bash");
+ commands.add(scriptFile.getAbsolutePath());
+ containerLaunchContext.setCommands(commands);
+ containerLaunchContext.setResource(recordFactory
+ .newRecordInstance(Resource.class));
+ containerLaunchContext.getResource().setMemory(100 * 1024 * 1024);
+ StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ containerManager.startContainer(startRequest);
+
+ int timeoutSecs = 0;
+ while (!processStartFile.exists() && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for process start-file to be created");
+ }
+ Assert.assertTrue("ProcessStartFile doesn't exist!",
+ processStartFile.exists());
+
+ // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
+ SignalContainerRequest signalContainerRequest =
+ recordFactory.newRecordInstance(SignalContainerRequest.class);
+ signalContainerRequest.setContainerId(cId);
+ signalContainerRequest.setSignal(3);
+ List reqs = new ArrayList();
+ reqs.add(signalContainerRequest);
+ containerManager.handle(new CMgrSignalContainersEvent(reqs));
+
+ verify(executorSpy).signalContainer(anyString(), anyString(), any(Signal.class));
+
+ StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+ stopRequest.setContainerId(cId);
+ containerManager.stopContainer(stopRequest);
+
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
+ ContainerState.COMPLETE);
+
+ GetContainerStatusRequest gcsRequest =
+ recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+ gcsRequest.setContainerId(cId);
+ ContainerStatus containerStatus =
+ containerManager.getContainerStatus(gcsRequest).getStatus();
+ Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
+ containerStatus.getExitStatus());
+
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index f8894c7..66631a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -124,4 +125,8 @@ public String localizationCountersAsString() {
public void handle(ContainerEvent event) {
}
+ @Override
+ public void Signal(Signal signal) {
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index ec4053c..ea28bd2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -62,12 +62,15 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -85,7 +88,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -580,4 +585,66 @@ private boolean isAllowedDelegationTokenOp() throws IOException {
return true;
}
}
+
+ /**
+ * Signal a container.
+ * After the request passes some sanity check, it will be delivered
+ * to RMNodeImpl so that the next NM heartbeat will pick up the signal request
+ */
+ @Override
+ public SignalContainerResponse signalContainer(
+ SignalContainerRequest request) throws YarnRemoteException {
+ ContainerId containerId = request.getContainerId();
+
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie) {
+ LOG.info("Error getting UGI ", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+
+ ApplicationId applicationId = containerId.getApplicationAttemptId().
+ getApplicationId();
+ RMApp application = this.rmContext.getRMApps().get(applicationId);
+ if (application == null) {
+ RMAuditLogger.logFailure(callerUGI.getUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
+ "Trying to signal an absent container", applicationId, containerId);
+ throw RPCUtil
+ .getRemoteException("Trying to signal an absent container "
+ + containerId);
+ }
+
+ if (!checkAccess(callerUGI, application.getUser(),
+ ApplicationAccessType.MODIFY_APP, applicationId)) {
+ RMAuditLogger.logFailure(callerUGI.getShortUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to "
+ + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
+ AuditConstants.UNAUTHORIZED_USER, applicationId);
+ throw RPCUtil.getRemoteException(new AccessControlException("User "
+ + callerUGI.getShortUserName() + " cannot perform operation "
+ + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
+ }
+
+ RMContainer container = scheduler.getRMContainer(containerId);
+ if (container != null) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeSignalContainerEvent(container.getContainer().getNodeId(),
+ request));
+ RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId,
+ containerId);
+ } else {
+ RMAuditLogger.logFailure(callerUGI.getUserName(),
+ AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
+ "Trying to signal an absent container", applicationId, containerId);
+ throw RPCUtil
+ .getRemoteException("Trying to signal an absent container "
+ + containerId);
+ }
+
+ return recordFactory
+ .newRecordInstance(SignalContainerResponse.class);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
index b9261ca..3a1889d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
@@ -51,6 +51,7 @@
public static final String REGISTER_AM = "Register App Master";
public static final String ALLOC_CONTAINER = "AM Allocated Container";
public static final String RELEASE_CONTAINER = "AM Released Container";
+ public static final String SIGNAL_CONTAINER = "Signal Container Request";
// Some commonly used descriptions
public static final String UNAUTHORIZED_USER = "Unauthorized user";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index ef644be..f59454a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -36,6 +36,7 @@
// Source: Container
CONTAINER_ALLOCATED,
CLEANUP_CONTAINER,
+ SIGNAL_CONTAINER,
// Source: NMLivelinessMonitor
EXPIRE
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 7b9b8b1..c1d1da5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -104,6 +105,10 @@
private final Set containersToClean = new TreeSet(
new ContainerIdComparator());
+ /* set of containers that need to be signaled */
+ private final List containersToSignal =
+ new ArrayList();
+
/* the list of applications that have finished and need to be purged */
private final List finishedApplications = new ArrayList();
@@ -124,7 +129,7 @@
RMNodeEventType.STARTED, new AddNodeTransition())
//Transitions from RUNNING state
- .addTransition(NodeState.RUNNING,
+ .addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
@@ -142,9 +147,11 @@
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
//Transitions from UNHEALTHY state
- .addTransition(NodeState.UNHEALTHY,
+ .addTransition(NodeState.UNHEALTHY,
EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
@@ -162,7 +169,9 @@
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
-
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
+
// create the topology tables
.installTopology();
@@ -311,8 +320,10 @@ public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
response.addAllContainersToCleanup(
new ArrayList(this.containersToClean));
response.addAllApplicationsToCleanup(this.finishedApplications);
+ response.addAllContainersToSignal(this.containersToSignal);
this.containersToClean.clear();
this.finishedApplications.clear();
+ this.containersToSignal.clear();
} finally {
this.writeLock.unlock();
}
@@ -469,6 +480,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
}
+ public static class SignalContainerTransition implements
+ SingleArcTransition {
+
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ rmNode.containersToSignal.add(((
+ RMNodeSignalContainerEvent) event).getSignalRequest());
+ }
+ }
+
public static class DeactivateNodeTransition
implements SingleArcTransition {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java
new file mode 100644
index 0000000..9f5ecf0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+
+public class RMNodeSignalContainerEvent extends RMNodeEvent {
+
+ private SignalContainerRequest signalRequest;
+
+ public RMNodeSignalContainerEvent(NodeId nodeId,
+ SignalContainerRequest signalRequest) {
+ super(nodeId, RMNodeEventType.SIGNAL_CONTAINER);
+ this.signalRequest = signalRequest;
+ }
+
+ public SignalContainerRequest getSignalRequest() {
+ return this.signalRequest;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index f084649..5e3cb71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
/**
@@ -130,4 +131,14 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
@LimitedPrivate("yarn")
@Evolving
QueueMetrics getRootQueueMetrics();
+
+ /**
+ * Get a specific live container based on the container id
+ * @param containerId the container id
+ * @return live container
+ */
+ @LimitedPrivate("yarn")
+ @Evolving
+ RMContainer getRMContainer(ContainerId containerId);
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 2fc7540..d03bdbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -800,7 +800,8 @@ FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
- private RMContainer getRMContainer(ContainerId containerId) {
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 310e376..50e54a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -177,7 +177,8 @@ public QueueManager getQueueManager() {
return queueMgr;
}
- private RMContainer getRMContainer(ContainerId containerId) {
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 3f25537..992c51d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -781,8 +781,9 @@ public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
-
- private RMContainer getRMContainer(ContainerId containerId) {
+
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 12391c6..7011357 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -29,12 +29,15 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -239,6 +242,16 @@ public void killApp(ApplicationId appId) throws Exception {
client.forceKillApplication(req);
}
+ public void signalContainer(ContainerId containerId, int signal)
+ throws Exception {
+ ClientRMProtocol client = getClientRMService();
+ SignalContainerRequest req = Records
+ .newRecord(SignalContainerRequest.class);
+ req.setContainerId(containerId);
+ req.setSignal(signal);
+ client.signalContainer(req);
+ }
+
// from AMLauncher
public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId)
throws Exception {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java
new file mode 100644
index 0000000..01dca9f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class TestSignalContainer {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestSignalContainer.class);
+
+ @Test
+ public void testSignalContainer() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ MockRM rm = new MockRM();
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("h1:1234", 5000);
+
+ RMApp app = rm.submitApp(2000);
+
+ //kick the scheduling
+ nm1.nodeHeartbeat(true);
+
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+
+ //request for containers
+ int request = 2;
+ am.allocate("h1" , 1000, request,
+ new ArrayList());
+
+ //kick the scheduler
+ nm1.nodeHeartbeat(true);
+ List conts = am.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers();
+ int contReceived = conts.size();
+ int waitCount = 0;
+ while (contReceived < request && waitCount++ < 200) {
+ LOG.info("Got " + contReceived + " containers. Waiting to get "
+ + request);
+ Thread.sleep(100);
+ conts = am.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers();
+ contReceived += conts.size();
+ }
+ Assert.assertEquals(request, contReceived);
+
+ for(Container container : conts) {
+ rm.signalContainer(container.getId(), 3);
+ }
+
+ HeartbeatResponse resp = nm1.nodeHeartbeat(true);
+ List contsToSignal = resp.getContainersToSignalList();
+ int signaledConts = contsToSignal.size();
+
+
+ waitCount = 0;
+ while ( signaledConts < 2 && waitCount++ < 200) {
+ LOG.info("Waiting to get signalcontainer events.. signaledConts: "
+ + signaledConts);
+ Thread.sleep(100);
+ resp = nm1.nodeHeartbeat(true);
+ contsToSignal = resp.getContainersToSignalList();
+ signaledConts += contsToSignal.size();
+ }
+
+ Assert.assertEquals(2, signaledConts);
+
+ am.unregisterAppAttempt();
+ nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
+ ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+
+ rm.stop();
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestSignalContainer t = new TestSignalContainer();
+ t.testSignalContainer();
+ }
+}
\ No newline at end of file