diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java
new file mode 100644
index 0000000..c9741e5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
The request sent by the client to the ResourceManager
+ * to signal a container.
+ *
+ */
+@Public
+@Stable
+public abstract class SignalContainerRequest {
+
+ @Public
+ @Stable
+ public static SignalContainerRequest newInstance(ContainerId containerId,
+ SignalContainerCommand signalContainerCommand,
+ String diagnostics) {
+ SignalContainerRequest request =
+ Records.newRecord(SignalContainerRequest.class);
+ request.setContainerId(containerId);
+ request.setCommand(signalContainerCommand);
+ request.setDiagnostics(diagnostics);
+ return request;
+ }
+
+ /**
+ * Get the ContainerId of the container to signal.
+ * @return ContainerId of the container to signal.
+ */
+ @Public
+ @Stable
+ public abstract ContainerId getContainerId();
+
+ /**
+ * Set the ContainerId of the container to signal.
+ */
+ @Public
+ @Stable
+ public abstract void setContainerId(ContainerId containerId);
+
+ /**
+ * Get the SignalContainerCommand of the signal request.
+ * @return SignalContainerCommand of the signal request.
+ */
+ @Public
+ @Stable
+ public abstract SignalContainerCommand getCommand();
+
+ /**
+ * Set the SignalContainerCommand of the signal request.
+ */
+ @Public
+ @Stable
+ public abstract void setCommand(SignalContainerCommand command);
+
+ /**
+ * Get the diagnostics string of the signal request.
+ * @return diagnostics of the signal request.
+ */
+ @Public
+ @Stable
+ public abstract String getDiagnostics();
+
+ /**
+ * Set the diagnostics string of the signal request.
+ */
+ @Public
+ @Stable
+ public abstract void setDiagnostics(String diagnostics);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java
new file mode 100644
index 0000000..f5255b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The response sent by the ResourceManager to the client
+ * signalling a container.
+ *
+ */
+@Public
+@Unstable
+public abstract class SignalContainerResponse {
+
+ @Public
+ @Unstable
+ public static SignalContainerResponse newInstance(
+ boolean isCommandSubmitted) {
+ SignalContainerResponse response =
+ Records.newRecord(SignalContainerResponse.class);
+ response.setIsCommandSubmitted(isCommandSubmitted);
+ return response;
+ }
+
+ /**
+ * Get the flag which indicates that the signal command is submitted or not.
+ */
+ @Public
+ @Unstable
+ public abstract boolean getIsCommandSubmitted();
+
+ /**
+ * Set the flag which indicates that the signal command is submitted or not.
+ */
+ @Public
+ @Unstable
+ public abstract void setIsCommandSubmitted(boolean isCommandSubmitted);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java
new file mode 100644
index 0000000..2554aed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java
@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+
+/**
+ * Enumeration of various signal container commands.
+ */
+@Public
+@Stable
+public enum SignalContainerCommand {
+
+ /**
+ * On Linux, it is equivalent to 0.
+ */
+ NULL,
+
+ /**
+ * Used to capture thread dump.
+ * On Linux, it is equivalent to SIGQUIT.
+ */
+ OUTPUT_THREAD_DUMP,
+
+ /** Gracefully pause a container.
+ * On Linux, it is equivalent to SIGTSTP.
+ */
+ GRACEFUL_PAUSE,
+
+ /** Forcefully shutdown a container.
+ * On Linux, it is equivalent to SIGSTOP.
+ */
+ FORCEFUL_PAUSE,
+
+ /** Resume a paused container.
+ * On Linux, it is equivalent to SIGCONT.
+ */
+ RESUME,
+
+ /** Gracefully shutdown a container.
+ * On Linux, it is equivalent to SIGTERM.
+ */
+ GRACEFUL_SHUTDOWN,
+
+ /** Forcefully shutdown a container.
+ * On Linux, it is equivalent to SIGKILL.
+ */
+ FORCEFUL_SHUTDOWN,
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 48aac9d..abf949d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -329,6 +329,16 @@ message QueueUserACLInfoProto {
repeated QueueACLProto userAcls = 2;
}
+enum SignalContainerCommandProto {
+ NULL = 1;
+ OUTPUT_THREAD_DUMP = 2;
+ GRACEFUL_PAUSE = 3;
+ FORCEFUL_PAUSE = 4;
+ RESUME = 5;
+ GRACEFUL_SHUTDOWN = 6;
+ FORCEFUL_SHUTDOWN = 7;
+}
+
////////////////////////////////////////////////////////////////////////
////// From container_manager //////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index a1f6d2e..d358e25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -187,6 +187,15 @@ message GetQueueUserAclsInfoResponseProto {
repeated QueueUserACLInfoProto queueUserAcls = 1;
}
+message SignalContainerRequestProto {
+ optional ContainerIdProto container_id = 1;
+ optional SignalContainerCommandProto command = 2;
+ optional string diagnostics = 3;
+}
+
+message SignalContainerResponseProto {
+ optional bool is_command_submitted = 1 [default = false];
+}
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java
new file mode 100644
index 0000000..4b552c3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.SignalContainerCommandProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder;
+
+
+public class SignalContainerRequestPBImpl
+ extends SignalContainerRequest {
+ SignalContainerRequestProto proto =
+ SignalContainerRequestProto.getDefaultInstance();
+ SignalContainerRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ContainerId containerId;
+ private SignalContainerCommand command = null;
+ private String diagnostics;
+
+ private static SignalContainerCommand convertFromProtoFormat(
+ SignalContainerCommandProto p) {
+ return SignalContainerCommand.valueOf(p.name());
+ }
+
+ private static SignalContainerCommandProto convertToProtoFormat(
+ SignalContainerCommand p) {
+ return SignalContainerCommandProto.valueOf(p.name());
+ }
+
+ public SignalContainerRequestPBImpl() {
+ builder = SignalContainerRequestProto.newBuilder();
+ }
+
+ public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainerRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containerId != null) {
+ builder.setContainerId(convertToProtoFormat(this.containerId));
+ }
+
+ if (this.command != null) {
+ builder.setCommand(convertToProtoFormat(this.command));
+ }
+
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SignalContainerRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.containerId != null) {
+ return this.containerId;
+ }
+ if (!p.hasContainerId()) {
+ return null;
+ }
+ this.containerId = convertFromProtoFormat(p.getContainerId());
+ return this.containerId;
+ }
+
+ @Override
+ public void setContainerId(ContainerId containerId) {
+ maybeInitBuilder();
+ if (containerId == null) {
+ builder.clearContainerId();
+ }
+ this.containerId = containerId;
+ }
+
+ private void initCommand() {
+ if (this.command != null) {
+ return;
+ }
+ SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if(p.hasCommand()) {
+ this.command = convertFromProtoFormat(p.getCommand());
+ }
+ }
+
+ @Override
+ public SignalContainerCommand getCommand() {
+ initCommand();
+ return command;
+ }
+
+ @Override
+ public void setCommand(SignalContainerCommand command) {
+ maybeInitBuilder();
+ if (command == null) {
+ builder.clearCommand();
+ }
+ this.command = command;
+ }
+
+ @Override
+ public String getDiagnostics() {
+ if (diagnostics != null) {
+ return diagnostics;
+ }
+ final SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (p.hasDiagnostics()) {
+ diagnostics = p.getDiagnostics();
+ }
+ return diagnostics;
+ }
+
+ @Override
+ public void setDiagnostics(String diagnostics) {
+ maybeInitBuilder();
+ if (diagnostics == null) {
+ builder.clearDiagnostics();
+ }
+ this.diagnostics = diagnostics;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl)t).getProto();
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java
new file mode 100644
index 0000000..d3a9ba8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProtoOrBuilder;
+
+
+public class SignalContainerResponsePBImpl
+ extends SignalContainerResponse {
+ SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance();
+ SignalContainerResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SignalContainerResponsePBImpl() {
+ builder = SignalContainerResponseProto.newBuilder();
+ }
+
+ public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainerResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SignalContainerResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public boolean getIsCommandSubmitted() {
+ SignalContainerResponseProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getIsCommandSubmitted();
+ }
+
+ @Override
+ public void setIsCommandSubmitted(boolean isKillCompleted) {
+ maybeInitBuilder();
+ builder.setIsCommandSubmitted(isKillCompleted);
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 38dfa58..50b6412 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -38,18 +39,22 @@
MasterKey getContainerTokenMasterKey();
void setContainerTokenMasterKey(MasterKey secretKey);
-
+
MasterKey getNMTokenMasterKey();
void setNMTokenMasterKey(MasterKey secretKey);
void addAllContainersToCleanup(List containers);
-
+
void addAllApplicationsToCleanup(List applications);
long getNextHeartBeatInterval();
void setNextHeartBeatInterval(long nextHeartBeatInterval);
-
+
String getDiagnosticsMessage();
void setDiagnosticsMessage(String diagnosticsMessage);
+
+ List getContainersToSignalList();
+ void addAllContainersToSignal(List containers);
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 775f95a..e99c66b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -21,7 +21,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
-
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
@@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -49,7 +51,8 @@
private List applicationsToCleanup = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
-
+ private List containersToSignal = null;
+
public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder();
}
@@ -81,6 +84,9 @@ private void mergeLocalToBuilder() {
builder.setNmTokenMasterKey(
convertToProtoFormat(this.nmTokenMasterKey));
}
+ if (this.containersToSignal != null) {
+ addContainersToSignalToProto();
+ }
}
private void mergeLocalToProto() {
@@ -362,5 +368,75 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl) t).getProto();
}
+
+ @Override
+ public List getContainersToSignalList() {
+ initContainersToSignal();
+ return this.containersToSignal;
+ }
+
+ private void initContainersToSignal() {
+ if (this.containersToSignal != null) {
+ return;
+ }
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getContainersToSignalList();
+ this.containersToSignal = new ArrayList();
+
+ for (SignalContainerRequestProto c : list) {
+ this.containersToSignal.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void addAllContainersToSignal(
+ final List containersToSignal) {
+ if (containersToSignal == null)
+ return;
+ initContainersToSignal();
+ this.containersToSignal.addAll(containersToSignal);
+ }
+
+ private void addContainersToSignalToProto() {
+ maybeInitBuilder();
+ builder.clearContainersToSignal();
+ if (containersToSignal == null)
+ return;
+
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = containersToSignal.iterator();
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public SignalContainerRequestProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllContainersToSignal(iterable);
+ }
+
+ private SignalContainerRequestPBImpl convertFromProtoFormat(
+ SignalContainerRequestProto p) {
+ return new SignalContainerRequestPBImpl(p);
+ }
+
+ private SignalContainerRequestProto convertToProtoFormat(
+ SignalContainerRequest t) {
+ return ((SignalContainerRequestPBImpl)t).getProto();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java
index 8bdff62..a28beb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SerializedException;
@@ -43,7 +44,8 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
NodeAction action, List containersToCleanUp,
List applicationsToCleanUp,
MasterKey containerTokenMasterKey, MasterKey nmTokenMasterKey,
- long nextHeartbeatInterval) {
+ long nextHeartbeatInterval,
+ List containersToSignal) {
NodeHeartbeatResponse response = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
response.setResponseId(responseId);
@@ -57,6 +59,9 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
if(applicationsToCleanUp != null) {
response.addAllApplicationsToCleanup(applicationsToCleanUp);
}
+ if (containersToSignal != null) {
+ response.addAllContainersToSignal(containersToSignal);
+ }
return response;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index c544905..af9473b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -24,6 +24,7 @@ package hadoop.yarn;
import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
+import "yarn_service_protos.proto";
message RegisterNodeManagerRequestProto {
optional NodeIdProto node_id = 1;
@@ -57,4 +58,5 @@ message NodeHeartbeatResponseProto {
repeated ApplicationIdProto applications_to_cleanup = 6;
optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8;
+ repeated SignalContainerRequestProto containers_to_signal = 9;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java
new file mode 100644
index 0000000..cc967b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+
+public class CMgrSignalContainersEvent extends ContainerManagerEvent {
+
+ private List containerToSignal;
+
+ public CMgrSignalContainersEvent(List containerToSignal) {
+ super(ContainerManagerEventType.SIGNAL_CONTAINERS);
+ this.containerToSignal = containerToSignal;
+ }
+
+ public List getContainersToSignal() {
+ return this.containerToSignal;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index ee72fbc..67559a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -35,8 +35,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@@ -120,7 +120,7 @@ public abstract int launchContainer(Container container,
List logDirs) throws IOException;
public abstract boolean signalContainer(String user, String pid,
- Signal signal)
+ SignalContainerCommand command)
throws IOException;
public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
@@ -147,15 +147,20 @@ public String toString() {
/**
* The constants for the signals.
+ * Applicable to POSIX-compliant operating systems
*/
public enum Signal {
- NULL(0, "NULL"), QUIT(3, "SIGQUIT"),
- KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
- private final int value;
+ NULL(0, "NULL", SignalContainerCommand.NULL),
+ QUIT(3, "SIGQUIT", SignalContainerCommand.OUTPUT_THREAD_DUMP),
+ KILL(9, "SIGKILL", SignalContainerCommand.FORCEFUL_SHUTDOWN),
+ TERM(15, "SIGTERM", SignalContainerCommand.GRACEFUL_SHUTDOWN);
+ private final int value; // actual value used in the "kill" command
private final String str;
- private Signal(int value, String str) {
+ private final SignalContainerCommand command; // OS-independent command
+ private Signal(int value, String str, SignalContainerCommand command) {
this.str = str;
this.value = value;
+ this.command = command;
}
public int getValue() {
return value;
@@ -164,6 +169,17 @@ public int getValue() {
public String toString() {
return str;
}
+ public static Signal valueOf(SignalContainerCommand command) {
+ if (command == QUIT.command) {
+ return QUIT;
+ } else if (command == KILL.command) {
+ return KILL;
+ } else if (command == TERM.command) {
+ return TERM;
+ } else {
+ return NULL;
+ }
+ }
}
protected void logOutput(String output) {
@@ -297,16 +313,16 @@ public String getProcessId(ContainerId containerID) {
private final String user;
private final String pid;
private final long delay;
- private final Signal signal;
+ private final SignalContainerCommand command;
private final ContainerExecutor containerExecutor;
public DelayedProcessKiller(Container container, String user, String pid,
- long delay, Signal signal, ContainerExecutor containerExecutor) {
+ long delay, SignalContainerCommand command, ContainerExecutor containerExecutor) {
this.container = container;
this.user = user;
this.pid = pid;
this.delay = delay;
- this.signal = signal;
+ this.command = command;
this.containerExecutor = containerExecutor;
setName("Task killer for " + pid);
setDaemon(false);
@@ -315,7 +331,7 @@ public DelayedProcessKiller(Container container, String user, String pid,
public void run() {
try {
Thread.sleep(delay);
- containerExecutor.signalContainer(user, pid, signal);
+ containerExecutor.signalContainer(user, pid, command);
} catch (InterruptedException e) {
return;
} catch (IOException e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
index 4278ce0..8a7e423 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
@@ -21,4 +21,5 @@
public enum ContainerManagerEventType {
FINISH_APPS,
FINISH_CONTAINERS,
+ SIGNAL_CONTAINERS,
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 9e2e111..06fee41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -312,15 +313,15 @@ public void writeLocalWrapperScript(Path launchDst, Path pidFile,
}
@Override
- public boolean signalContainer(String user, String pid, Signal signal)
- throws IOException {
- LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
+ public boolean signalContainer(String user, String pid,
+ SignalContainerCommand command) throws IOException {
+ LOG.debug("Sending signal " + command + " to pid " + pid
+ " as user " + user);
if (!containerIsAlive(pid)) {
return false;
}
try {
- killContainer(pid, signal);
+ signalContainer(pid, Signal.valueOf(command));
} catch (IOException e) {
if (!containerIsAlive(pid)) {
return false;
@@ -357,7 +358,7 @@ public static boolean containerIsAlive(String pid) throws IOException {
* @param signal signal to send
* (for logging).
*/
- private void killContainer(String pid, Signal signal) throws IOException {
+ private void signalContainer(String pid, Signal signal) throws IOException {
new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
.execute();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index cbdcb13..ffd50af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@@ -317,8 +318,8 @@ public int launchContainer(Container container,
}
@Override
- public boolean signalContainer(String user, String pid, Signal signal)
- throws IOException {
+ public boolean signalContainer(String user, String pid,
+ SignalContainerCommand signalCommand) throws IOException {
verifyUsernamePattern(user);
String runAsUser = getRunAsUser(user);
@@ -329,7 +330,7 @@ public boolean signalContainer(String user, String pid, Signal signal)
user,
Integer.toString(Commands.SIGNAL_CONTAINER.getValue()),
pid,
- Integer.toString(signal.getValue()) };
+ Integer.toString(Signal.valueOf(signalCommand).getValue()) };
ShellCommandExecutor shExec = new ShellCommandExecutor(command);
if (LOG.isDebugEnabled()) {
LOG.debug("signalContainer: " + Arrays.toString(command));
@@ -341,11 +342,11 @@ public boolean signalContainer(String user, String pid, Signal signal)
if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
return false;
}
- LOG.warn("Error in signalling container " + pid + " with " + signal
+ LOG.warn("Error in signalling container " + pid + " with " + signalCommand
+ "; exit = " + ret_code, e);
logOutput(shExec.getOutput());
throw new IOException("Problem signalling container " + pid + " with "
- + signal + "; output: " + shExec.getOutput() + " and exitCode: "
+ + signalCommand + "; output: " + shExec.getOutput() + " and exitCode: "
+ ret_code, e);
}
return true;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 4db000c..a2c66dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -533,6 +534,18 @@ public void run() {
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
+
+ // SignalContainer request comes from application's call to
+ // ApplicationClientProtocol's SignalContainer; RM forwards the request to NM after
+ // authorization check. NodeStatusUpdaterImpl forwards the request to ContainerManager.
+ // ContainerManager will dispatch the request to ContainerLauncher.
+ List containersToSignal =
+ response.getContainersToSignalList();
+ if (containersToSignal.size() != 0) {
+ dispatcher.getEventHandler().handle(
+ new CMgrSignalContainersEvent(containersToSignal));
+ }
+
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index dd3deb3..a73411a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -83,6 +84,7 @@
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -106,6 +108,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
@@ -879,6 +882,23 @@ public void handle(ContainerManagerEvent event) {
"Container Killed by ResourceManager"));
}
break;
+ case SIGNAL_CONTAINERS:
+ CMgrSignalContainersEvent containersSignalEvent =
+ (CMgrSignalContainersEvent) event;
+ for (SignalContainerRequest request : containersSignalEvent
+ .getContainersToSignal()) {
+ ContainerId containerId = request.getContainerId();
+ Container container = this.context.getContainers().get(containerId);
+ if (container != null) {
+ LOG.info("Container " + containerId + " signal request by ResourceManager");
+ this.dispatcher.getEventHandler().handle(
+ new SignalContainersLauncherEvent(container,
+ request.getCommand()));
+ } else {
+ LOG.info("Container " + containerId + " no longer exists");
+ }
+ }
+ break;
default:
throw new YarnRuntimeException(
"Got an unknown ContainerManagerEvent type: " + event.getType());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index e252e35..f92466a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@@ -393,20 +394,21 @@ public void cleanupContainer() throws IOException {
+ " as user " + user
+ " for container " + containerIdStr);
- final Signal signal = sleepDelayBeforeSigKill > 0
- ? Signal.TERM
- : Signal.KILL;
+ final SignalContainerCommand command = sleepDelayBeforeSigKill > 0
+ ? SignalContainerCommand.GRACEFUL_SHUTDOWN
+ : SignalContainerCommand.FORCEFUL_SHUTDOWN;
- boolean result = exec.signalContainer(user, processId, signal);
+ boolean result = exec.signalContainer(user, processId, command);
- LOG.debug("Sent signal " + signal + " to pid " + processId
+ LOG.debug("Sent signal " + command + " to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr
+ ", result=" + (result? "success" : "failed"));
if (sleepDelayBeforeSigKill > 0) {
new DelayedProcessKiller(container, user,
- processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
+ processId, sleepDelayBeforeSigKill,
+ SignalContainerCommand.FORCEFUL_SHUTDOWN, exec).start();
}
}
} catch (Exception e) {
@@ -426,6 +428,54 @@ public void cleanupContainer() throws IOException {
}
/**
+ * Send a signal to the container.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void signalContainer(SignalContainerCommand command) throws IOException {
+ ContainerId containerId = container.getContainerId();
+ String containerIdStr = ConverterUtils.toString(containerId);
+ String user = container.getUser();
+
+ LOG.info("Sending signal " + command + " to container " + containerIdStr);
+
+ boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
+ if (!alreadyLaunched) {
+ LOG.info("Container " + containerIdStr + " not launched."
+ + " Not sending the signal");
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting pid for container " + containerIdStr
+ + " to send signal to from pid file "
+ + (pidFilePath != null ? pidFilePath.toString() : "null"));
+ }
+
+ try {
+ // get process id from pid file if available
+ // else if shell is still active, get it from the shell
+ String processId = null;
+ if (pidFilePath != null) {
+ processId = getContainerPid(pidFilePath);
+ }
+
+ if (processId != null) {
+ LOG.info("Sending signal " + command + " to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr);
+ exec.signalContainer(user, processId, command);
+ }
+ } catch (Exception e) {
+ String message =
+ "Exception when sending signal to container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.warn(message);
+ }
+ }
+
+
+ /**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index ce865e3..ddd8d3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -141,6 +141,23 @@ public void handle(ContainersLauncherEvent event) {
+ ". Ignoring.");
}
break;
+ case SIGNAL_CONTAINER:
+ SignalContainersLauncherEvent signalEvent =
+ (SignalContainersLauncherEvent) event;
+ ContainerLaunch runningContainer = running.get(containerId);
+ if (runningContainer == null) {
+ // Container not launched. So nothing needs to be done.
+ LOG.info("Container " + containerId + " not running, nothing to signal.");
+ return;
+ }
+
+ try {
+ runningContainer.signalContainer(signalEvent.getCommand());
+ } catch (IOException e) {
+ LOG.warn("Got exception while cleaning container " + containerId
+ + ". Ignoring.");
+ }
+ break;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 6793bf7..92e1fcc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -21,4 +21,5 @@
public enum ContainersLauncherEventType {
LAUNCH_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
+ SIGNAL_CONTAINER,
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
new file mode 100644
index 0000000..ac11c73
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+// This event can be triggered in one of the following ways
+// WebUI -> Container
+// CLI -> ApplicationClientProtocol -> NM HeartbeatResponse -> ContainerManager
+public class SignalContainersLauncherEvent extends ContainersLauncherEvent{
+
+ private final SignalContainerCommand command;
+ public SignalContainersLauncherEvent(
+ Container container, SignalContainerCommand command) {
+ super(container, ContainersLauncherEventType.SIGNAL_CONTAINER);
+ this.command = command;
+ }
+ public SignalContainerCommand getCommand() {
+ return command;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
index 3f4091c..91e0b3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
@@ -97,7 +97,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(heartBeatID, null, null,
- null, null, null, 1000L);
+ null, null, null, 1000L, null);
return nhResponse;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index a47e7f7..6d90dbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -189,6 +189,18 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception {
super.testStartContainerFailureWithUnknownAuxService();
}
+ @Override
+ public void testContainerLaunchAndSignal() throws InterruptedException,
+ IOException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerLaunchAndSignal");
+ super.testContainerLaunchAndSignal();
+ }
+
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
index f840730..7dbf45e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
@@ -31,6 +31,7 @@
import java.io.PrintWriter;
import java.util.HashMap;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -253,7 +254,8 @@ public void run() {
assertNotNull(pid);
LOG.info("Going to killing the process.");
- exec.signalContainer(appSubmitter, pid, Signal.TERM);
+ exec.signalContainer(appSubmitter, pid,
+ SignalContainerCommand.GRACEFUL_SHUTDOWN);
LOG.info("sleeping for 100ms to let the sleep be killed");
Thread.sleep(100);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
index ddffa27..98acd8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
@@ -34,6 +34,7 @@
import java.util.LinkedList;
import java.util.List;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -271,10 +272,11 @@ public void testContainerKill() throws IOException {
String appSubmitter = "nobody";
String cmd = String.valueOf(
LinuxContainerExecutor.Commands.SIGNAL_CONTAINER.getValue());
- ContainerExecutor.Signal signal = ContainerExecutor.Signal.QUIT;
- String sigVal = String.valueOf(signal.getValue());
+ SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP;
+ String sigVal = String.valueOf(
+ ContainerExecutor.Signal.valueOf(command).getValue());
- mockExec.signalContainer(appSubmitter, "1000", signal);
+ mockExec.signalContainer(appSubmitter, "1000", command);
assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
appSubmitter, cmd, "1000", sigVal),
readMockParams());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index 5c2e085..44bcc28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -246,7 +246,7 @@ public NodeHeartbeatResponse nodeHeartbeat(
// notify RESYNC on first heartbeat.
return YarnServerBuilderUtils.newNodeHeartbeatResponse(1,
- NodeAction.RESYNC, null, null, null, null, 1000L);
+ NodeAction.RESYNC, null, null, null, null, 1000L, null);
}
};
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 0ff34a5..6e87cd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,6 +59,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -147,9 +149,15 @@ public static MasterKey createMasterKey() {
private class MyResourceTracker implements ResourceTracker {
private final Context context;
+ private boolean signalContainer;
public MyResourceTracker(Context context) {
+ this(context, false);
+ }
+
+ public MyResourceTracker(Context context, boolean signalContainer) {
this.context = context;
+ this.signalContainer = signalContainer;
}
@Override
@@ -202,7 +210,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID++);
Map> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
-
+ List containersToSignal = null;
+
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
@@ -240,6 +249,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.context.getContainers();
Assert.assertEquals(1, activeContainers.size());
+ if (this.signalContainer) {
+ containersToSignal = new ArrayList();
+ SignalContainerRequest signalReq = recordFactory
+ .newRecordInstance(SignalContainerRequest.class);
+ signalReq.setContainerId(appToContainers.get(appId1).get(0).getContainerId());
+ signalReq.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP);
+ containersToSignal.add(signalReq);
+ }
// Give another container to the NM.
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId2, 0);
@@ -276,20 +293,45 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null,
- 1000L);
+ 1000L, containersToSignal);
return nhResponse;
}
}
+ private class MyContainerManager extends ContainerManagerImpl {
+ public boolean signaled = false;
+
+ public MyContainerManager(Context context, ContainerExecutor exec,
+ DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
+ NodeManagerMetrics metrics, ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
+ super(context, exec, deletionContext, nodeStatusUpdater,
+ metrics, aclsManager, dirsHandler);
+ }
+
+ @Override
+ public void handle(ContainerManagerEvent event) {
+ if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) {
+ signaled = true;
+ }
+ }
+ }
+
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ this(context, dispatcher, healthChecker, metrics, false);
+ }
+
+ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+ boolean signalContainer) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
- resourceTracker = new MyResourceTracker(this.context);
+ this.resourceTracker = new MyResourceTracker(this.context, signalContainer);
}
@Override
@@ -495,7 +537,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
- null, null, null, 1000L);
+ null, null, null, 1000L, null);
nhResponse.setDiagnosticsMessage(shutDownMessage);
return nhResponse;
}
@@ -534,7 +576,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID++);
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
- null, null, null, 1000L);
+ null, null, null, 1000L, null);
if (nodeStatus.getKeepAliveApplications() != null
&& nodeStatus.getKeepAliveApplications().size() > 0) {
@@ -674,7 +716,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
- heartBeatNodeAction, null, null, null, null, 1000L);
+ heartBeatNodeAction, null, null, null, null, 1000L, null);
return nhResponse;
}
}
@@ -750,7 +792,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null,
- null, null, null, 1000L);
+ null, null, null, 1000L, null);
return nhResponse;
}
}
@@ -859,7 +901,65 @@ public void run() {
nm.stop();
}
-
+
+ //Verify that signalContainer request can be dispatched from
+ //NodeStatusUpdaterImpl to ContainerManagerImpl.
+ @Test
+ public void testSignalContainerToContainerManager() throws Exception {
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new MyNodeStatusUpdater(
+ context, dispatcher, healthChecker, metrics, true);
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(Context context,
+ ContainerExecutor exec, DeletionService del,
+ NodeStatusUpdater nodeStatusUpdater,
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService diskhandler) {
+ return new MyContainerManager(context, exec, del, nodeStatusUpdater,
+ metrics, aclsManager, diskhandler);
+ }
+
+ };
+
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+
+ int waitCount = 0;
+ while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to start..");
+ if (nmStartError != null) {
+ LOG.error("Error during startup. ", nmStartError);
+ Assert.fail(nmStartError.getCause().getMessage());
+ }
+ Thread.sleep(1000);
+ }
+ if (nm.getServiceState() != STATE.STARTED) {
+ // NM could have failed.
+ Assert.fail("NodeManager failed to start");
+ }
+
+ waitCount = 0;
+ while (heartBeatID <= 3 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ Assert.assertFalse(heartBeatID <= 3);
+ Assert.assertEquals("Number of registered NMs is wrong!!", 1,
+ this.registeredNodes.size());
+
+ MyContainerManager containerManager =
+ (MyContainerManager)nm.getContainerManager();
+ Assert.assertTrue(containerManager.signaled);
+
+ nm.stop();
+
+ }
+
@Test
public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 8e4b0f3..18e8a6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+import static org.mockito.Mockito.spy;
+
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -171,7 +173,7 @@ public void setup() throws IOException {
delSrvc = createDeletionService();
delSrvc.init(conf);
- exec = createContainerExecutor();
+ exec = spy(createContainerExecutor());
nodeHealthChecker = new NodeHealthCheckerService();
nodeHealthChecker.init(conf);
dirsHandler = nodeHealthChecker.getDiskHandler();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 53a3c4b..b2550a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -42,6 +46,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -59,6 +64,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -68,6 +74,7 @@
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -807,4 +814,96 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenIdentifier);
return containerToken;
}
+
+ // The test verifies after containerManager can process CMgrSignalContainersEvent properly
+ @Test
+ public void testContainerLaunchAndSignal() throws IOException,
+ InterruptedException, YarnException {
+
+ containerManager.start();
+
+ File scriptFile = new File(tmpDir, "scriptFile.sh");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ File processStartFile =
+ new File(tmpDir, "start_file.txt").getAbsoluteFile();
+ fileWriter.write("\numask 0"); // So that start file is readable by the test
+ fileWriter.write("\necho Hello World! > " + processStartFile);
+ fileWriter.write("\necho $$ >> " + processStartFile);
+ fileWriter.write("\nexec sleep 1000s");
+ fileWriter.close();
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // ////// Construct the Container-id
+ ContainerId cId = createContainerId(0);
+
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map localResources =
+ new HashMap();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+ List commands = new ArrayList();
+ commands.add("/bin/bash");
+ commands.add(scriptFile.getAbsolutePath());
+ containerLaunchContext.setCommands(commands);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+ user, context.getContainerTokenSecretManager()));
+ List list = new ArrayList();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
+ int timeoutSecs = 0;
+ while (!processStartFile.exists() && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for process start-file to be created");
+ }
+ Assert.assertTrue("ProcessStartFile doesn't exist!",
+ processStartFile.exists());
+
+ // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
+ SignalContainerRequest signalContainerRequest =
+ recordFactory.newRecordInstance(SignalContainerRequest.class);
+ signalContainerRequest.setContainerId(cId);
+ signalContainerRequest.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP);
+ List reqs = new ArrayList();
+ reqs.add(signalContainerRequest);
+ containerManager.handle(new CMgrSignalContainersEvent(reqs));
+
+ verify(exec).signalContainer(anyString(), anyString(), any(SignalContainerCommand.class));
+
+ List containerIds = new ArrayList();
+ containerIds.add(cId);
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ containerManager.stopContainers(stopRequest);
+
+
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
+ ContainerState.COMPLETE);
+
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ ContainerStatus containerStatus =
+ containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+
+ Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
+ containerStatus.getExitStatus());
+
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
index 1102ebb..9afc014 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -61,7 +62,6 @@
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -287,7 +287,7 @@ public void testContainerKillOnMemoryOverflow() throws IOException,
// Assert that the process is not alive anymore
Assert.assertFalse("Process is still alive!",
exec.signalContainer(user,
- pid, Signal.NULL));
+ pid, SignalContainerCommand.NULL));
}
@Test(timeout = 20000)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 1d40320..984f6be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -394,7 +394,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
- nextHeartBeatInterval);
+ nextHeartBeatInterval, null);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);