diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java index 31315bc..946ec0e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java @@ -96,6 +96,7 @@ th(_TH, "Start Time"). th(_TH, "Node"). th(_TH, "Logs"). + th(_TH, "Dump Threads"). _(); for (AMInfo amInfo : amInfos) { AMAttemptInfo attempt = new AMAttemptInfo(amInfo, @@ -107,8 +108,10 @@ td().a(".nodelink", url(HttpConfig.getSchemePrefix(), attempt.getNodeHttpAddress()), attempt.getNodeHttpAddress())._(). - td().a(".logslink", url(attempt.getLogsLink()), + td().a(".logslink", url(attempt.getLogsLink()), "logs")._(). + td().a(".dumpthreads", url(attempt.getDumpThreadsLink()), + "Dump Threads")._(). _(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java index 809f06a..1aa73c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java @@ -66,7 +66,9 @@ protected void render(Block html) { th(".tsh", "Started"). th(".tsh", "Finished"). th(".tsh", "Elapsed"). - th(".note", "Note")._()._()._(); + th(".note", "Note"). + th(".dumpthreads", "DumpThreads"). + _()._()._(); // Write all the data into a JavaScript array of arrays for JQuery // DataTables to display StringBuilder attemptsTableData = new StringBuilder("[\n"); @@ -98,7 +100,11 @@ protected void render(Block html) { .append(ta.getFinishTime()).append("\",\"") .append(ta.getElapsedTime()).append("\",\"") .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( - diag))).append("\"],\n"); + diag))).append("\",\"") + .append(ta.getAssignedContainerId() == null ? "N/A" : + "Click to dump threads") + .append("\"],\n"); } //Remove the last comma and close off the array of arrays if(attemptsTableData.charAt(attemptsTableData.length() - 2) == ',') { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java index 8dcb7c5..74929f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java @@ -40,6 +40,7 @@ protected long startTime; protected String containerId; protected String logsLink; + protected String dumpThreadsLink; public AMAttemptInfo() { } @@ -66,6 +67,8 @@ public AMAttemptInfo(AMInfo amInfo, String jobId, String user) { this.containerId = containerId.toString(); this.logsLink = join(HttpConfig.getSchemePrefix() + nodeHttpAddress, ujoin("node", "containerlogs", this.containerId, user)); + this.dumpThreadsLink = join(HttpConfig.getSchemePrefix() + nodeHttpAddress, + ujoin("node", "containerdumpthreads", this.containerId)); } } @@ -93,4 +96,8 @@ public String getLogsLink() { return this.logsLink; } + public String getDumpThreadsLink() { + return this.dumpThreadsLink; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 08e9b31..216c6f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -63,6 +63,7 @@ import org.apache.hadoop.util.ReflectionUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.api.records.ContainerId; /** Implements MapReduce locally, in-process, for debugging. */ @InterfaceAudience.Private @@ -842,7 +843,14 @@ public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID, throws IOException, InterruptedException { throw new UnsupportedOperationException("Not supported"); } - + + @Override + public ContainerId getContainerId(org.apache.hadoop.mapreduce.JobID jobID, + org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Not supported"); + } + static void setupChildMapredLocalDirs(Task t, JobConf conf) { String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR); String jobId = t.getJobID().toString(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index d075e0c..7cfbeff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -39,6 +39,10 @@ org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop hadoop-hdfs test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java index e93f273..cd158b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java @@ -42,6 +42,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ContainerId; /** * Provides a way to access information about the map/reduce cluster. @@ -223,6 +224,11 @@ public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) return client.getLogFileParams(jobID, taskAttemptID); } + public ContainerId getContainerId(JobID jobID, TaskAttemptID taskAttemptID) + throws IOException, InterruptedException { + return client.getContainerId(jobID, taskAttemptID); + } + /** * Get current cluster status. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java index a4ee4a4..ed40b0a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java @@ -45,6 +45,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.yarn.api.records.ContainerId; /** * Protocol that a JobClient and the central JobTracker use to communicate. The @@ -120,7 +121,7 @@ * MAPREDUCE-3146 * Version 39: Added killTask(TaskAttemptID, boolean, String) */ - public static final long versionID = 39L; + public static final long versionID = 40L; /** * Allocate a name for the job. @@ -384,4 +385,15 @@ public void cancelDelegationToken(Token token */ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, InterruptedException; + + /** + * Gets the container id of a specific task attempt + * @param jobID the jobId. + * @param taskAttemptID the taskAttemptId. + * @return container id. + * @throws IOException + * @throws InterruptedException + */ + public ContainerId getContainerId(JobID jobID, TaskAttemptID taskAttemptID) + throws IOException, InterruptedException; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index 8416cca..9c5dffe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -49,6 +49,9 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.client.YarnClient; +import org.apache.hadoop.yarn.client.YarnClientImpl; import org.apache.hadoop.yarn.logaggregation.LogDumper; import com.google.common.base.Charsets; @@ -61,6 +64,7 @@ public class CLI extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(CLI.class); protected Cluster cluster; + protected YarnClient client; public CLI() { } @@ -68,7 +72,13 @@ public CLI() { public CLI(Configuration conf) { setConf(conf); } - + + private void initYarnClient() { + client = new YarnClientImpl(); + client.init(getConf()); + client.start(); + } + public int run(String[] argv) throws Exception { int exitCode = -1; if (argv.length < 1) { @@ -101,6 +111,8 @@ public int run(String[] argv) throws Exception { boolean displayTasks = false; boolean killTask = false; String killReason = null; + boolean signalTask = false; + int signal = 0; boolean failTask = false; boolean setJobPriority = false; boolean logs = false; @@ -188,6 +200,15 @@ public int run(String[] argv) throws Exception { killTask = true; taskid = argv[1]; killReason = argv.length == 3 ? argv[2] : null; + } else if("-signal-task".equals(cmd)) { + if (argv.length < 2 || argv.length > 3) { + displayUsage(cmd); + return exitCode; + } + signalTask = true; + taskid = argv[1]; + // if no signal number is specified, use 3 + signal = argv.length == 3 ? Integer.parseInt(argv[2]) : 3; } else if("-fail-task".equals(cmd)) { if (argv.length < 2 || argv.length > 3) { displayUsage(cmd); @@ -237,7 +258,8 @@ public int run(String[] argv) throws Exception { // initialize cluster cluster = new Cluster(getConf()); - + initYarnClient(); + // Submit the request try { if (submitJobFile != null) { @@ -326,6 +348,16 @@ public int run(String[] argv) throws Exception { System.out.println("Could not kill task " + taskid); exitCode = -1; } + } else if(signalTask) { + TaskAttemptID taskID = TaskAttemptID.forName(taskid); + ContainerId containerId = cluster.getContainerId(taskID.getJobID(), taskID); + if (containerId == null) { + System.out.println("Could not find task " + taskID); + exitCode = -1; + } else { + client.signalContainer(containerId, signal); + exitCode = 0; + } } else if(failTask) { TaskAttemptID taskID = TaskAttemptID.forName(taskid); Job job = cluster.getJob(taskID.getJobID()); @@ -446,6 +478,7 @@ private void displayUsage(String cmd) { "Valid values for are " + taskStates); System.err.printf("\t[-kill-task [reason]]%n"); System.err.printf("\t[-fail-task [reason]]%n"); + System.err.printf("\t[-signal-task [signal number]]%n"); System.err.printf("\t[-logs ]%n%n"); ToolRunner.printGenericCommandUsage(System.out); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index d0ce704..54e862a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ClientToken; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -501,4 +502,33 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID) throw new IOException("Cannot get log path for a in-progress job"); } } + + // This is only used to get the container id of a running taskattempt + // If the application has finished, we don't need to ask HS + public ContainerId getContainerId(TaskAttemptID taskAttemptID) + throws YarnRemoteException, IOException { + ApplicationReport application = rm.getApplicationReport(appId); + if (application == null || + YarnApplicationState.RUNNING != application.getYarnApplicationState()) { + throw new IOException("application isn't running: " + appId); + } + + if (taskAttemptID != null) { + GetTaskAttemptReportRequest taRequest = + recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class); + taRequest.setTaskAttemptId(TypeConverter.toYarn(taskAttemptID)); + TaskAttemptReport taReport = + ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport", + GetTaskAttemptReportRequest.class, taRequest)) + .getTaskAttemptReport(); + if (taReport.getContainerId() == null + || taReport.getNodeManagerHost() == null) { + throw new IOException("Unable to get log information for task: " + + taskAttemptID); + } + return taReport.getContainerId(); + } + return null; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 9bb08fb..079b135 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLogAggregationPolicy; import org.apache.hadoop.yarn.api.records.DelegationToken; @@ -642,6 +643,12 @@ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID); } + @Override + public ContainerId getContainerId(JobID jobID, TaskAttemptID taskAttemptID) + throws IOException, InterruptedException { + return clientCache.getClient(jobID).getContainerId(taskAttemptID); + } + private static void warnForJavaLibPath(String opts, String component, String javaConf, String envConf) { if (opts != null && opts.contains("-Djava.library.path")) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 503d188..ce3d0d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -355,6 +357,13 @@ public CancelDelegationTokenResponse cancelDelegationToken( CancelDelegationTokenRequest request) throws YarnRemoteException { return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnRemoteException { + return null; + } + } class HistoryService extends AMService implements HSClientProtocol { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java index 8a19dad..15bceb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -291,4 +293,29 @@ public RenewDelegationTokenResponse renewDelegationToken( @Private public CancelDelegationTokenResponse cancelDelegationToken( CancelDelegationTokenRequest request) throws YarnRemoteException; + + /** + *

The interface used by clients to request the + * ResourceManager to signal a container. For example, + * the client can send signal 3 to dump threads of the container.

+ * + *

The client, via {@link SignalContainerRequest} provides the + * id of the container and the signal number.

+ * + *

In secure mode,the ResourceManager verifies access to the + * application before signaling the container. + * The user needs to have MODIFY_APP permission.

+ * + *

Currently, the ResourceManager returns an empty response + * on success and throws an exception on rejecting the request.

+ * + * @param request request to signal a container + * @return ResourceManager returns an empty response + * on success and throws an exception on rejecting the request + * @throws YarnRemoteException + * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) + */ + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnRemoteException; + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java new file mode 100644 index 0000000..9e1d41f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + *

The request sent by the client to the ResourceManager + * to signal a container.

+ * + *

The request includes the {@link ContainerId} of the container to be + * signal.

+ * + * @see ClientRMProtocol#signalContainer(SignalContainerRequest) + */ +@Public +@Stable +public interface SignalContainerRequest { + /** + * Get the ContainerId of the container to signal. + * @return ContainerId of the container to signal. + */ + @Public + @Stable + public abstract ContainerId getContainerId(); + + @Public + @Stable + public abstract void setContainerId(ContainerId containerId); + + @Public + @Stable + public abstract int getSignal(); + + @Public + @Stable + public abstract void setSignal(int signal); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java new file mode 100644 index 0000000..1c95c0e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ClientRMProtocol; + +/** + *

The response sent by the ResourceManager to the client + * signalling a container.

+ * + *

Currently it's empty.

+ * + * @see ClientRMProtocol#signalContainer(SignalContainerRequest) + */ +@Public +@Stable +public interface SignalContainerResponse { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java new file mode 100644 index 0000000..0e791ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; + + +public class SignalContainerRequestPBImpl + extends ProtoBase implements SignalContainerRequest { + SignalContainerRequestProto proto = SignalContainerRequestProto.getDefaultInstance(); + SignalContainerRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId; + + public SignalContainerRequestPBImpl() { + builder = SignalContainerRequestProto.newBuilder(); + } + + public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + + @Override + public ContainerId getContainerId() { + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerId != null) { + return this.containerId; + } + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) + builder.clearContainerId(); + this.containerId = containerId; + } + + @Override + public int getSignal() { + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getSignal(); + + } + + @Override + public void setSignal(int signal) { + maybeInitBuilder(); + builder.setSignal(signal); + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java new file mode 100644 index 0000000..bcf5a03 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; + + + +public class SignalContainerResponsePBImpl + extends ProtoBase implements SignalContainerResponse { + SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance(); + SignalContainerResponseProto.Builder builder = null; + boolean viaProto = false; + + public SignalContainerResponsePBImpl() { + builder = SignalContainerResponseProto.newBuilder(); + } + + public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerResponseProto.newBuilder(proto); + } + viaProto = false; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto index 5aa2380..1a9cdf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_RM_protocol.proto @@ -37,5 +37,6 @@ service ClientRMProtocolService { rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto); rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto); rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto); + rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index b073299..2f346d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -141,6 +141,14 @@ message GetQueueUserAclsInfoResponseProto { repeated QueueUserACLInfoProto queueUserAcls = 1; } +message SignalContainerRequestProto { + optional ContainerIdProto container_id = 1; + optional int32 signal = 2; +} + +message SignalContainerResponseProto { +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java index edc35a9..dc135fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -236,4 +237,19 @@ ApplicationReport getApplicationReport(ApplicationId appId) * @throws YarnRemoteException */ List getQueueAclsInfo() throws YarnRemoteException; + + /** + *

+ * Signal a container identified by given ID. + *

+ * + * @param containerId + * {@link ContainerId} of the container that needs to be signaled + * @param signal the signal number + * @throws YarnRemoteException + * in case of errors or if YARN rejects the request due to + * access-control restrictions. + */ + void signalContainer(ContainerId containerId, int signal) throws YarnRemoteException; + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java index 5ee3cd7..27e0584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java @@ -45,10 +45,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -278,4 +280,15 @@ private void getChildQueues(QueueInfo parent, List queues, } } } + + @Override + public void signalContainer(ContainerId containerId, int signal) + throws YarnRemoteException { + LOG.info("Signalling container " + containerId); + SignalContainerRequest request = + Records.newRecord(SignalContainerRequest.class); + request.setContainerId(containerId); + request.setSignal(signal); + rmClient.signalContainer(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 7156b43..997a888 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -30,6 +30,7 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -57,6 +58,11 @@ public int run(String[] args) throws Exception { killOption.setArgs(2); killOption.setArgName("Application ID [reason]"); opts.addOption(killOption); + Option signalOption = new Option(SIGNAL_CMD, true, + "Signal the container. Default signal number is 3."); + signalOption.setArgs(2); + signalOption.setArgName("container ID [signal number]"); + opts.addOption(signalOption); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -76,6 +82,18 @@ public int run(String[] args) throws Exception { } final String[] killArgs = cliParser.getOptionValues(KILL_CMD); killApplication(killArgs[0], killArgs.length == 2 ? killArgs[1] : null); + } else if (cliParser.hasOption(SIGNAL_CMD)) { + if (args.length < 2 || args.length > 3) { + printUsage(opts); + return exitCode; + } + final String[] signalArgs = cliParser.getOptionValues(SIGNAL_CMD); + final String containerId = signalArgs[0]; + int signal = 3; + if (signalArgs.length == 2) { + signal = Integer.parseInt(signalArgs[1]); + } + signalContainer(containerId, signal); } else { syserr.println("Invalid Command Usage : "); printUsage(opts); @@ -129,6 +147,20 @@ private void killApplication(String applicationId, String reason) } /** + * Signals the containerId + * + * @param containerIdStr the container id + * @param signal the signal number + * @throws YarnRemoteException + */ + private void signalContainer(String containerIdStr, int signal) + throws YarnRemoteException { + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + sysout.println("Signalling container " + containerIdStr); + client.signalContainer(containerId, signal); + } + + /** * Prints the application report for an application id. * * @param applicationId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java index a36e671..ee18fee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java @@ -30,6 +30,7 @@ public static final String STATUS_CMD = "status"; public static final String LIST_CMD = "list"; public static final String KILL_CMD = "kill"; + public static final String SIGNAL_CMD = "signal"; protected PrintStream sysout; protected PrintStream syserr; protected YarnClient client; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java index 25212b8..9032684 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl; @@ -76,6 +78,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; @@ -88,6 +92,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -268,4 +273,17 @@ public CancelDelegationTokenResponse cancelDelegationToken( throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnRemoteException { + SignalContainerRequestProto requestProto = + ((SignalContainerRequestPBImpl) request).getProto(); + try { + return new SignalContainerResponsePBImpl( + proxy.signalContainer(null, requestProto)); + } catch (ServiceException e) { + throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java index 1c2d5b0..5a74bc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientRMProtocolPBServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; @@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; @@ -79,6 +82,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -250,4 +255,17 @@ public CancelDelegationTokenResponseProto cancelDelegationToken( throw new ServiceException(e); } } + + @Override + public SignalContainerResponseProto signalContainer(RpcController arg0, + SignalContainerRequestProto proto) throws ServiceException { + SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto); + try { + SignalContainerResponse response = real.signalContainer(request); + return ((SignalContainerResponsePBImpl)response).getProto(); + } catch (YarnRemoteException e) { + throw new ServiceException(e); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java index 4536934..17e154d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java @@ -19,6 +19,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -33,7 +34,7 @@ List getApplicationsToCleanupList(); ApplicationId getApplicationsToCleanup(int index); int getApplicationsToCleanupCount(); - + void setResponseId(int responseId); void setNodeAction(NodeAction action); @@ -49,4 +50,7 @@ void addApplicationToCleanup(ApplicationId applicationId); void removeApplicationToCleanup(int index); void clearApplicationsToCleanup(); + + List getContainersToSignalList(); + void addAllContainersToSignal(List containers); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java index 8a7d890..71300cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java @@ -23,6 +23,8 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ProtoBase; @@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -46,6 +49,7 @@ private List containersToCleanup = null; private List applicationsToCleanup = null; + private List containersToSignal = null; private MasterKey masterKey = null; public HeartbeatResponsePBImpl() { @@ -75,6 +79,9 @@ private void mergeLocalToBuilder() { if (this.masterKey != null) { builder.setMasterKey(convertToProtoFormat(this.masterKey)); } + if (this.containersToSignal != null) { + addContainersToSignalToProto(); + } } private void mergeLocalToProto() { @@ -324,6 +331,76 @@ private ContainerIdProto convertToProtoFormat(ContainerId t) { return ((ContainerIdPBImpl)t).getProto(); } + @Override + public List getContainersToSignalList() { + initContainersToSignal(); + return this.containersToSignal; + } + + private void initContainersToSignal() { + if (this.containersToSignal != null) { + return; + } + HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToSignalList(); + this.containersToSignal = new ArrayList(); + + for (SignalContainerRequestProto c : list) { + this.containersToSignal.add(convertFromProtoFormat(c)); + } + } + + @Override + public void addAllContainersToSignal( + final List containersToSignal) { + if (containersToSignal == null) + return; + initContainersToSignal(); + this.containersToSignal.addAll(containersToSignal); + } + + private void addContainersToSignalToProto() { + maybeInitBuilder(); + builder.clearContainersToSignal(); + if (containersToSignal == null) + return; + + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToSignal.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public SignalContainerRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToSignal(iterable); + } + + private SignalContainerRequestPBImpl convertFromProtoFormat( + SignalContainerRequestProto p) { + return new SignalContainerRequestPBImpl(p); + } + + private SignalContainerRequestProto convertToProtoFormat( + SignalContainerRequest t) { + return ((SignalContainerRequestPBImpl)t).getProto(); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -331,7 +408,7 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { private ApplicationIdProto convertToProtoFormat(ApplicationId t) { return ((ApplicationIdPBImpl)t).getProto(); } - + private NodeAction convertFromProtoFormat(NodeActionProto p) { return NodeAction.valueOf(p.name()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 71f5b1b..53019ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -22,6 +22,7 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; import "yarn_protos.proto"; +import "yarn_service_protos.proto"; enum NodeActionProto { NORMAL = 0; @@ -53,5 +54,6 @@ message HeartbeatResponseProto { optional NodeActionProto nodeAction = 3; repeated ContainerIdProto containers_to_cleanup = 4; repeated ApplicationIdProto applications_to_cleanup = 5; + repeated SignalContainerRequestProto containers_to_signal = 6; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java new file mode 100644 index 0000000..cc967b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; + +public class CMgrSignalContainersEvent extends ContainerManagerEvent { + + private List containerToSignal; + + public CMgrSignalContainersEvent(List containerToSignal) { + super(ContainerManagerEventType.SIGNAL_CONTAINERS); + this.containerToSignal = containerToSignal; + } + + public List getContainersToSignal() { + return this.containerToSignal; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 9cffde1..36303d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -158,6 +158,18 @@ public int getValue() { public String toString() { return str; } + + public static Signal signalOf(int value) { + if (value == QUIT.value) { + return QUIT; + } else if (value == KILL.value) { + return KILL; + } else if (value == TERM.value) { + return TERM; + } else { + return NULL; + } + } } protected void logOutput(String output) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java index 4278ce0..8a7e423 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java @@ -21,4 +21,5 @@ public enum ContainerManagerEventType { FINISH_APPS, FINISH_CONTAINERS, + SIGNAL_CONTAINERS, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 841e169..7499e7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -374,7 +375,7 @@ public void run() { .getContainersToCleanupList(); if (containersToCleanup.size() != 0) { dispatcher.getEventHandler().handle( - new CMgrCompletedContainersEvent(containersToCleanup, + new CMgrCompletedContainersEvent(containersToCleanup, CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); } List appsToCleanup = @@ -385,6 +386,16 @@ public void run() { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); } + + // SignalContainer request originally comes from end users via + // ClientRMProtocol's SignalContainer. This request will be forwarded + // ContainerManager which later will dispatch the event to ContainerLauncher + List containersToSignal = response + .getContainersToSignalList(); + if (containersToSignal.size() != 0) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the // NM still running. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index fafbf0e..48c2ef7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; @@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -86,6 +89,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; @@ -608,6 +612,24 @@ public void handle(ContainerManagerEvent event) { new ContainerKillEvent(container, diagnostic)); } break; + case SIGNAL_CONTAINERS: + CMgrSignalContainersEvent containersSignalEvent = + (CMgrSignalContainersEvent) event; + for (SignalContainerRequest request : containersSignalEvent + .getContainersToSignal()) { + ContainerId containerId = request.getContainerId(); + Container container = this.context.getContainers().get(containerId); + if (container != null) { + LOG.info("Container " + containerId + " signal request by ResourceManager"); + this.dispatcher.getEventHandler().handle( + new SignalContainersLauncherEvent(container, + Signal.signalOf(request.getSignal()))); + } else { + LOG.info("Container " + containerId + " no longer exists"); + } + } + break; + default: LOG.warn("Invalid event " + event.getType() + ". Ignoring."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index acfd2a7..300076f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; public interface Container extends EventHandler { @@ -46,4 +47,6 @@ String toString(); String localizationCountersAsString(); + + void Signal(Signal signal); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index e9ff5ec..5e63da0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; @@ -952,4 +954,10 @@ public String localizationCountersAsString() { return result.toString(); } + @Override + public void Signal(Signal signal) { + dispatcher.getEventHandler().handle( + new SignalContainersLauncherEvent(this, signal)); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index edbb44d..ba41f3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -378,6 +378,56 @@ public void cleanupContainer(boolean dumpThreads) throws IOException { } /** + * Send a signal to the container. + * + * Acceptable signals: {@link Signal#QUIT} + * + * @throws IOException + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void signalContainer(Signal signal) throws IOException { + ContainerId containerId = container.getContainerID(); + String containerIdStr = ConverterUtils.toString(containerId); + String user = container.getUser(); + + LOG.info("Sending signal " + signal + " to container " + containerIdStr); + + boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + if (!alreadyLaunched) { + LOG.info("Container " + containerIdStr + " not launched." + + " Not sending the signal"); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting pid for container " + containerIdStr + + " to send signal to from pid file " + + (pidFilePath != null ? pidFilePath.toString() : "null")); + } + + try { + // get process id from pid file if available + // else if shell is still active, get it from the shell + String processId = null; + if (pidFilePath != null) { + processId = getContainerPid(pidFilePath); + } + + if (processId != null) { + LOG.info("Sending signal " + signal + " to pid " + processId + + " as user " + user + + " for container " + containerIdStr); + exec.signalContainer(user, processId, signal); + } + } catch (Exception e) { + String message = + "Exception when sending signal to container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.warn(message); + } + } + + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. * @param pidFilePath File from which to read the process id diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 1f3fd21..1f06b71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -147,6 +147,23 @@ public void handle(ContainersLauncherEvent event) { + ". Ignoring."); } break; + case SIGNAL_CONTAINER: + SignalContainersLauncherEvent signalEvent = + (SignalContainersLauncherEvent) event; + RunningContainer runningContainer = running.get(containerId); + if (runningContainer == null) { + // Container not launched. So nothing needs to be done. + LOG.info("Container " + containerId + " not running, nothing to signal."); + return; + } + + try { + runningContainer.launcher.signalContainer(signalEvent.getSignal()); + } catch (IOException e) { + LOG.warn("Got exception while cleaning container " + containerId + + ". Ignoring."); + } + break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 6793bf7..92e1fcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -21,4 +21,5 @@ public enum ContainersLauncherEventType { LAUNCH_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. + SIGNAL_CONTAINER, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java new file mode 100644 index 0000000..d5ed469 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java @@ -0,0 +1,37 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +// This event can be triggered by one of the following flows +// WebUI -> Container +// CLI -> ClientRMProtocol -> NM HeartbeatResponse -> ContainerManager +public class SignalContainersLauncherEvent extends ContainersLauncherEvent{ + + private final Signal signal; + public SignalContainersLauncherEvent(Container container, Signal signal) { + super(container, ContainersLauncherEventType.SIGNAL_CONTAINER); + this.signal = signal; + } + public Signal getSignal() { + return signal; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java index 1bbb945..8eefb43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AllContainersPage.java @@ -52,7 +52,7 @@ private String containersTableInit() { return tableInit(). // containerid, containerid, log-url - append(", aoColumns:[null, null, {bSearchable:false}]} ").toString(); + append(", aoColumns:[null, null, {bSearchable:false}, null]} ").toString(); } @Override @@ -79,6 +79,7 @@ protected void render(Block html) { .td()._("ContainerId")._() .td()._("ContainerState")._() .td()._("logs")._() + .td()._("DumpThreads")._() ._() ._().tbody(); for (Entry entry : this.nmContext @@ -91,6 +92,8 @@ protected void render(Block html) { .td()._(info.getState())._() .td() .a(url(info.getShortLogLink()), "logs")._() + .td() + .a(url(info.getShortDumpThreadsLink()), "Click to dump threads")._() ._(); } tableBody._()._()._(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerDumpThreadsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerDumpThreadsPage.java new file mode 100644 index 0000000..d266fa3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerDumpThreadsPage.java @@ -0,0 +1,136 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.webapp.SubView; +import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; + +import com.google.inject.Inject; + +public class ContainerDumpThreadsPage extends NMView implements YarnWebParams { + + @Override + protected void preHead(Page.HTML<_> html) { + commonPreHead(html); + + setTitle("Threads Dump for " + $(CONTAINER_ID)); + set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}"); + } + + @Override + protected Class content() { + return ContainerBlock.class; + } + + public static class ContainerBlock extends HtmlBlock implements YarnWebParams { + + private final Context nmContext; + private final ApplicationACLsManager aclsManager; + + @Inject + public ContainerBlock(Context nmContext, ApplicationACLsManager aclsManager) { + this.nmContext = nmContext; + this.aclsManager = aclsManager; + } + + @Override + protected void render(Block html) { + ContainerId containerID; + try { + containerID = ConverterUtils.toContainerId($(CONTAINER_ID)); + } catch (IllegalArgumentException e) { + html.p()._("Invalid containerId " + $(CONTAINER_ID))._(); + return; + } + + DIV div = html.div("#content"); + + ApplicationId applicationId = containerID.getApplicationAttemptId() + .getApplicationId(); + Application application = this.nmContext.getApplications().get( + applicationId); + if (application == null) { + html.h1( + "Unknown container. Container either has not started or " + + "has already completed or doesn't belong to this node at all."); + return; + } + + Container container = this.nmContext.getContainers().get(containerID); + if (container == null) { + div.h1("Unknown Container. Container might have completed, " + + "please go back to the previous page and retry.")._(); + return; + } + + if (container.getContainerState() != ContainerState.RUNNING && + container.getContainerState() != ContainerState.KILLING) { + html.h1("Container wasn't started."); + return; + } + + dumpThreads(html, container, applicationId, application); + } + + private void dumpThreads(Block html, Container container, + ApplicationId applicationId, Application application) { + // Check for the authorization. + String remoteUser = request().getRemoteUser(); + UserGroupInformation callerUGI = null; + + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + if (callerUGI != null && !this.aclsManager.checkAccess(callerUGI, + ApplicationAccessType.MODIFY_APP, application.getUser(), + applicationId)) { + html.h1( + "User [" + remoteUser + + "] is not authorized to signal the container " + + container.getContainerID()); + return; + } + + container.Signal(ContainerExecutor.Signal.QUIT); + ContainerInfo info = new ContainerInfo(this.nmContext, container); + String logUrl = info.getShortLogLink(); + html.p().a(".logslink", url(logUrl, "stdout", "?start=-4096"), + "Threads Dump captured successfully. Click here for the result.")._(); + return; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java index 060d72a..c065710 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java @@ -85,7 +85,8 @@ protected void render(Block html) { ._("Diagnostics", info.getDiagnostics()) ._("User", info.getUser()) ._("TotalMemoryNeeded", info.getMemoryNeeded()) - ._("logs", info.getShortLogLink(), "Link to logs"); + ._("logs", info.getShortLogLink(), "Link to logs") + ._("Dump Threads", info.getShortDumpThreadsLink(), "Click to dump threads"); html._(InfoBlock.class); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java index 86e2505..c2ed70f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java @@ -103,4 +103,9 @@ public void logs() { } render(ContainerLogsPage.class); } + + public void containerDumpThreads() { + render(ContainerDumpThreadsPage.class); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java index 3128217..0250736 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java @@ -118,6 +118,9 @@ public void setup() { route( pajoin("/containerlogs", CONTAINER_ID, APP_OWNER, CONTAINER_LOG_TYPE), NMController.class, "logs"); + route( + pajoin("/containerdumpthreads", CONTAINER_ID, APP_OWNER), NMController.class, + "containerDumpThreads"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java index 41c649e..7d5fa79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java @@ -48,6 +48,8 @@ protected String containerLogsShortLink; @XmlTransient protected String exitStatus; + @XmlTransient + protected String containerDumpThreadsShortLink; public ContainerInfo() { } // JAXB needs this @@ -78,6 +80,8 @@ public ContainerInfo(final Context nmContext, final Container container, } this.containerLogsShortLink = ujoin("containerlogs", this.id, container.getUser()); + this.containerDumpThreadsShortLink = ujoin("containerdumpthreads", this.id, + container.getUser()); if (requestUri == null) { requestUri = ""; @@ -129,4 +133,8 @@ public long getMemoryNeeded() { return this.totalMemoryNeededMB; } + public String getShortDumpThreadsLink() { + return this.containerDumpThreadsShortLink; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index d65b096..ca72adb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -39,6 +39,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -107,9 +108,15 @@ public void tearDown() { private class MyResourceTracker implements ResourceTracker { private final Context context; + private boolean signalContainer; public MyResourceTracker(Context context) { + this(context, false); + } + + public MyResourceTracker(Context context, boolean signalContainer) { this.context = context; + this.signalContainer = signalContainer; } @Override @@ -167,6 +174,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); + List containersToSignal = null; if (heartBeatID == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); @@ -194,6 +202,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.context.getContainers(); Assert.assertEquals(1, activeContainers.size()); + if (this.signalContainer) { + containersToSignal = new ArrayList(); + SignalContainerRequest signalReq = recordFactory + .newRecordInstance(SignalContainerRequest.class); + signalReq.setContainerId(firstContainerID); + signalReq.setSignal(3); + containersToSignal.add(signalReq); + } + // Give another container to the NM. applicationID.setId(heartBeatID); appAttemptID.setApplicationId(applicationID); @@ -221,7 +238,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) HeartbeatResponse response = recordFactory .newRecordInstance(HeartbeatResponse.class); response.setResponseId(heartBeatID); - + if (containersToSignal != null) { + response.addAllContainersToSignal(containersToSignal); + } NodeHeartbeatResponse nhResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); nhResponse.setHeartbeatResponse(response); @@ -229,14 +248,41 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } } + private class MyContainerManager extends ContainerManagerImpl { + public boolean signaled = false; + + public MyContainerManager(Context context, ContainerExecutor exec, + DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, ApplicationACLsManager aclsManager, + LocalDirsHandlerService dirsHandler) { + super(context, exec, deletionContext, nodeStatusUpdater, + metrics, aclsManager, dirsHandler); + } + + @Override + public void handle(ContainerManagerEvent event) { + if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) { + signaled = true; + } + } + + } + private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { - public ResourceTracker resourceTracker = new MyResourceTracker(this.context); + public ResourceTracker resourceTracker; private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + this(context, dispatcher, healthChecker, metrics, false); + } + + public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + boolean signalContainer) { super(context, dispatcher, healthChecker, metrics); this.context = context; + this.resourceTracker = new MyResourceTracker(this.context, signalContainer); } @Override @@ -309,7 +355,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) .newRecordInstance(HeartbeatResponse.class); response.setResponseId(heartBeatID); response.setNodeAction(heartBeatNodeAction); - + NodeHeartbeatResponse nhResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); nhResponse.setHeartbeatResponse(response); @@ -394,8 +440,18 @@ public void testNMRegistration() throws InterruptedException { protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics); + } + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, + ApplicationACLsManager aclsManager, + LocalDirsHandlerService diskhandler) { + return new MyContainerManager(context, exec, del, nodeStatusUpdater, + metrics, aclsManager, diskhandler); } + }; YarnConfiguration conf = createNMConfig(); @@ -446,7 +502,68 @@ public void run() { nm.stop(); } - + + //Verify that signalContainer request can be dispatched from + //NodeStatusUpdaterImpl to ContainerManagerImpl. + @Test + public void testSignalContainerToContainerManager() throws Exception { + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new MyNodeStatusUpdater( + context, dispatcher, healthChecker, metrics, true); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, + ApplicationACLsManager aclsManager, + LocalDirsHandlerService diskhandler) { + return new MyContainerManager(context, exec, del, nodeStatusUpdater, + metrics, aclsManager, diskhandler); + } + + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + System.out.println(" ----- thread already started.." + + nm.getServiceState()); + + int waitCount = 0; + while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { + LOG.info("Waiting for NM to start.."); + if (nmStartError != null) { + LOG.error("Error during startup. ", nmStartError); + Assert.fail(nmStartError.getCause().getMessage()); + } + Thread.sleep(1000); + } + if (nm.getServiceState() != STATE.STARTED) { + // NM could have failed. + Assert.fail("NodeManager failed to start"); + } + + waitCount = 0; + while (heartBeatID <= 3 && waitCount++ != 20) { + Thread.sleep(500); + } + Assert.assertFalse(heartBeatID <= 3); + Assert.assertEquals("Number of registered NMs is wrong!!", 1, + this.registeredNodes.size()); + + MyContainerManager containerManager = + (MyContainerManager)nm.getContainerManager(); + Assert.assertTrue(containerManager.signaled); + + nm.stop(); + + } + @Test public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 5b01cc0..cf8c220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -35,7 +40,9 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -51,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -494,4 +503,98 @@ public void testLocalFilesCleanup() throws InterruptedException, Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", targetFile.exists()); } + + @Test + public void testContainerLaunchAndSignal() throws IOException, + InterruptedException { + + ContainerExecutor executorSpy = spy(exec); + containerManager = + new ContainerManagerImpl(context, executorSpy, delSrvc, nodeStatusUpdater, + metrics, new ApplicationACLsManager(conf), dirsHandler); + containerManager.init(conf); + containerManager.start(); + + File scriptFile = new File(tmpDir, "scriptFile.sh"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + File processStartFile = + new File(tmpDir, "start_file.txt").getAbsoluteFile(); + fileWriter.write("\numask 0"); // So that start file is readable by the test + fileWriter.write("\necho Hello World! > " + processStartFile); + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexec sleep 1000s"); + fileWriter.close(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // ////// Construct the Container-id + ContainerId cId = createContainerId(); + containerLaunchContext.setContainerId(cId); + + containerLaunchContext.setUser(user); + + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + containerLaunchContext.setUser(containerLaunchContext.getUser()); + List commands = new ArrayList(); + commands.add("/bin/bash"); + commands.add(scriptFile.getAbsolutePath()); + containerLaunchContext.setCommands(commands); + containerLaunchContext.setResource(recordFactory + .newRecordInstance(Resource.class)); + containerLaunchContext.getResource().setMemory(100 * 1024 * 1024); + StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + containerManager.startContainer(startRequest); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + + // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent + SignalContainerRequest signalContainerRequest = + recordFactory.newRecordInstance(SignalContainerRequest.class); + signalContainerRequest.setContainerId(cId); + signalContainerRequest.setSignal(3); + List reqs = new ArrayList(); + reqs.add(signalContainerRequest); + containerManager.handle(new CMgrSignalContainersEvent(reqs)); + + verify(executorSpy).signalContainer(anyString(), anyString(), any(Signal.class)); + + StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); + stopRequest.setContainerId(cId); + containerManager.stopContainer(stopRequest); + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + GetContainerStatusRequest gcsRequest = + recordFactory.newRecordInstance(GetContainerStatusRequest.class); + gcsRequest.setContainerId(cId); + ContainerStatus containerStatus = + containerManager.getContainerStatus(gcsRequest).getStatus(); + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), + containerStatus.getExitStatus()); + + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index f8894c7..66631a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -124,4 +125,8 @@ public String localizationCountersAsString() { public void handle(ContainerEvent event) { } + @Override + public void Signal(Signal signal) { + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index ec4053c..ea28bd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -62,12 +62,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -85,7 +88,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -580,4 +585,66 @@ private boolean isAllowedDelegationTokenOp() throws IOException { return true; } } + + /** + * Signal a container. + * After the request passes some sanity check, it will be delivered + * to RMNodeImpl so that the next NM heartbeat will pick up the signal request + */ + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnRemoteException { + ContainerId containerId = request.getContainerId(); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + ApplicationId applicationId = containerId.getApplicationAttemptId(). + getApplicationId(); + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", + "Trying to signal an absent container", applicationId, containerId); + throw RPCUtil + .getRemoteException("Trying to signal an absent container " + + containerId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, applicationId)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", + AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + + RMContainer container = scheduler.getRMContainer(containerId); + if (container != null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeSignalContainerEvent(container.getContainer().getNodeId(), + request)); + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId, + containerId); + } else { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", + "Trying to signal an absent container", applicationId, containerId); + throw RPCUtil + .getRemoteException("Trying to signal an absent container " + + containerId); + } + + return recordFactory + .newRecordInstance(SignalContainerResponse.class); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index b9261ca..3a1889d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -51,6 +51,7 @@ public static final String REGISTER_AM = "Register App Master"; public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; + public static final String SIGNAL_CONTAINER = "Signal Container Request"; // Some commonly used descriptions public static final String UNAUTHORIZED_USER = "Unauthorized user"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index ef644be..f59454a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -36,6 +36,7 @@ // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, + SIGNAL_CONTAINER, // Source: NMLivelinessMonitor EXPIRE diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7b9b8b1..c1d1da5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -104,6 +105,10 @@ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); + /* set of containers that need to be signaled */ + private final List containersToSignal = + new ArrayList(); + /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); @@ -124,7 +129,7 @@ RMNodeEventType.STARTED, new AddNodeTransition()) //Transitions from RUNNING state - .addTransition(NodeState.RUNNING, + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, @@ -142,9 +147,11 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) //Transitions from UNHEALTHY state - .addTransition(NodeState.UNHEALTHY, + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, @@ -162,7 +169,9 @@ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) - + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) + // create the topology tables .installTopology(); @@ -311,8 +320,10 @@ public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) { response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); + response.addAllContainersToSignal(this.containersToSignal); this.containersToClean.clear(); this.finishedApplications.clear(); + this.containersToSignal.clear(); } finally { this.writeLock.unlock(); } @@ -469,6 +480,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class SignalContainerTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.containersToSignal.add((( + RMNodeSignalContainerEvent) event).getSignalRequest()); + } + } + public static class DeactivateNodeTransition implements SingleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java new file mode 100644 index 0000000..9f5ecf0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; + +public class RMNodeSignalContainerEvent extends RMNodeEvent { + + private SignalContainerRequest signalRequest; + + public RMNodeSignalContainerEvent(NodeId nodeId, + SignalContainerRequest signalRequest) { + super(nodeId, RMNodeEventType.SIGNAL_CONTAINER); + this.signalRequest = signalRequest; + } + + public SignalContainerRequest getSignalRequest() { + return this.signalRequest; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index f084649..5e3cb71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; /** @@ -130,4 +131,14 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, @LimitedPrivate("yarn") @Evolving QueueMetrics getRootQueueMetrics(); + + /** + * Get a specific live container based on the container id + * @param containerId the container id + * @return live container + */ + @LimitedPrivate("yarn") + @Evolving + RMContainer getRMContainer(ContainerId containerId); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2fc7540..d03bdbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -800,7 +800,8 @@ FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } - private RMContainer getRMContainer(ContainerId containerId) { + @Override + public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp application = getApplication(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 310e376..50e54a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -177,7 +177,8 @@ public QueueManager getQueueManager() { return queueMgr; } - private RMContainer getRMContainer(ContainerId containerId) { + @Override + public RMContainer getRMContainer(ContainerId containerId) { FSSchedulerApp application = applications.get(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 3f25537..992c51d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -781,8 +781,9 @@ public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } - - private RMContainer getRMContainer(ContainerId containerId) { + + @Override + public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp application = getApplication(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 12391c6..7011357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -29,12 +29,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -239,6 +242,16 @@ public void killApp(ApplicationId appId) throws Exception { client.forceKillApplication(req); } + public void signalContainer(ContainerId containerId, int signal) + throws Exception { + ClientRMProtocol client = getClientRMService(); + SignalContainerRequest req = Records + .newRecord(SignalContainerRequest.class); + req.setContainerId(containerId); + req.setSignal(signal); + client.signalContainer(req); + } + // from AMLauncher public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java new file mode 100644 index 0000000..01dca9f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Test; + +public class TestSignalContainer { + + private static final Log LOG = LogFactory + .getLog(TestSignalContainer.class); + + @Test + public void testSignalContainer() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + MockRM rm = new MockRM(); + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 5000); + + RMApp app = rm.submitApp(2000); + + //kick the scheduling + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + //request for containers + int request = 2; + am.allocate("h1" , 1000, request, + new ArrayList()); + + //kick the scheduler + nm1.nodeHeartbeat(true); + List conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + int contReceived = conts.size(); + int waitCount = 0; + while (contReceived < request && waitCount++ < 200) { + LOG.info("Got " + contReceived + " containers. Waiting to get " + + request); + Thread.sleep(100); + conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + contReceived += conts.size(); + } + Assert.assertEquals(request, contReceived); + + for(Container container : conts) { + rm.signalContainer(container.getId(), 3); + } + + HeartbeatResponse resp = nm1.nodeHeartbeat(true); + List contsToSignal = resp.getContainersToSignalList(); + int signaledConts = contsToSignal.size(); + + + waitCount = 0; + while ( signaledConts < 2 && waitCount++ < 200) { + LOG.info("Waiting to get signalcontainer events.. signaledConts: " + + signaledConts); + Thread.sleep(100); + resp = nm1.nodeHeartbeat(true); + contsToSignal = resp.getContainersToSignalList(); + signaledConts += contsToSignal.size(); + } + + Assert.assertEquals(2, signaledConts); + + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + rm.stop(); + } + + public static void main(String[] args) throws Exception { + TestSignalContainer t = new TestSignalContainer(); + t.testSignalContainer(); + } +} \ No newline at end of file