diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java new file mode 100644 index 0000000..c9741e5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.util.Records; + +/** + *

The request sent by the client to the ResourceManager + * to signal a container.

+ * + */ +@Public +@Stable +public abstract class SignalContainerRequest { + + @Public + @Stable + public static SignalContainerRequest newInstance(ContainerId containerId, + SignalContainerCommand signalContainerCommand, + String diagnostics) { + SignalContainerRequest request = + Records.newRecord(SignalContainerRequest.class); + request.setContainerId(containerId); + request.setCommand(signalContainerCommand); + request.setDiagnostics(diagnostics); + return request; + } + + /** + * Get the ContainerId of the container to signal. + * @return ContainerId of the container to signal. + */ + @Public + @Stable + public abstract ContainerId getContainerId(); + + /** + * Set the ContainerId of the container to signal. + */ + @Public + @Stable + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the SignalContainerCommand of the signal request. + * @return SignalContainerCommand of the signal request. + */ + @Public + @Stable + public abstract SignalContainerCommand getCommand(); + + /** + * Set the SignalContainerCommand of the signal request. + */ + @Public + @Stable + public abstract void setCommand(SignalContainerCommand command); + + /** + * Get the diagnostics string of the signal request. + * @return diagnostics of the signal request. + */ + @Public + @Stable + public abstract String getDiagnostics(); + + /** + * Set the diagnostics string of the signal request. + */ + @Public + @Stable + public abstract void setDiagnostics(String diagnostics); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java new file mode 100644 index 0000000..f5255b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + *

The response sent by the ResourceManager to the client + * signalling a container.

+ * + */ +@Public +@Unstable +public abstract class SignalContainerResponse { + + @Public + @Unstable + public static SignalContainerResponse newInstance( + boolean isCommandSubmitted) { + SignalContainerResponse response = + Records.newRecord(SignalContainerResponse.class); + response.setIsCommandSubmitted(isCommandSubmitted); + return response; + } + + /** + * Get the flag which indicates that the signal command is submitted or not. + */ + @Public + @Unstable + public abstract boolean getIsCommandSubmitted(); + + /** + * Set the flag which indicates that the signal command is submitted or not. + */ + @Public + @Unstable + public abstract void setIsCommandSubmitted(boolean isCommandSubmitted); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java new file mode 100644 index 0000000..2554aed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java @@ -0,0 +1,67 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + +/** + * Enumeration of various signal container commands. + */ +@Public +@Stable +public enum SignalContainerCommand { + + /** + * On Linux, it is equivalent to 0. + */ + NULL, + + /** + * Used to capture thread dump. + * On Linux, it is equivalent to SIGQUIT. + */ + OUTPUT_THREAD_DUMP, + + /** Gracefully pause a container. + * On Linux, it is equivalent to SIGTSTP. + */ + GRACEFUL_PAUSE, + + /** Forcefully shutdown a container. + * On Linux, it is equivalent to SIGSTOP. + */ + FORCEFUL_PAUSE, + + /** Resume a paused container. + * On Linux, it is equivalent to SIGCONT. + */ + RESUME, + + /** Gracefully shutdown a container. + * On Linux, it is equivalent to SIGTERM. + */ + GRACEFUL_SHUTDOWN, + + /** Forcefully shutdown a container. + * On Linux, it is equivalent to SIGKILL. + */ + FORCEFUL_SHUTDOWN, + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 48aac9d..abf949d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -329,6 +329,16 @@ message QueueUserACLInfoProto { repeated QueueACLProto userAcls = 2; } +enum SignalContainerCommandProto { + NULL = 1; + OUTPUT_THREAD_DUMP = 2; + GRACEFUL_PAUSE = 3; + FORCEFUL_PAUSE = 4; + RESUME = 5; + GRACEFUL_SHUTDOWN = 6; + FORCEFUL_SHUTDOWN = 7; +} + //////////////////////////////////////////////////////////////////////// ////// From container_manager ////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..d358e25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -187,6 +187,15 @@ message GetQueueUserAclsInfoResponseProto { repeated QueueUserACLInfoProto queueUserAcls = 1; } +message SignalContainerRequestProto { + optional ContainerIdProto container_id = 1; + optional SignalContainerCommandProto command = 2; + optional string diagnostics = 3; +} + +message SignalContainerResponseProto { + optional bool is_command_submitted = 1 [default = false]; +} ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java new file mode 100644 index 0000000..4b552c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SignalContainerCommandProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder; + + +public class SignalContainerRequestPBImpl + extends SignalContainerRequest { + SignalContainerRequestProto proto = + SignalContainerRequestProto.getDefaultInstance(); + SignalContainerRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId; + private SignalContainerCommand command = null; + private String diagnostics; + + private static SignalContainerCommand convertFromProtoFormat( + SignalContainerCommandProto p) { + return SignalContainerCommand.valueOf(p.name()); + } + + private static SignalContainerCommandProto convertToProtoFormat( + SignalContainerCommand p) { + return SignalContainerCommandProto.valueOf(p.name()); + } + + public SignalContainerRequestPBImpl() { + builder = SignalContainerRequestProto.newBuilder(); + } + + public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + + if (this.command != null) { + builder.setCommand(convertToProtoFormat(this.command)); + } + + if (diagnostics != null) { + builder.setDiagnostics(diagnostics); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ContainerId getContainerId() { + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerId != null) { + return this.containerId; + } + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.containerId = containerId; + } + + private void initCommand() { + if (this.command != null) { + return; + } + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if(p.hasCommand()) { + this.command = convertFromProtoFormat(p.getCommand()); + } + } + + @Override + public SignalContainerCommand getCommand() { + initCommand(); + return command; + } + + @Override + public void setCommand(SignalContainerCommand command) { + maybeInitBuilder(); + if (command == null) { + builder.clearCommand(); + } + this.command = command; + } + + @Override + public String getDiagnostics() { + if (diagnostics != null) { + return diagnostics; + } + final SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (p.hasDiagnostics()) { + diagnostics = p.getDiagnostics(); + } + return diagnostics; + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + } + this.diagnostics = diagnostics; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java new file mode 100644 index 0000000..d3a9ba8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProtoOrBuilder; + + +public class SignalContainerResponsePBImpl + extends SignalContainerResponse { + SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance(); + SignalContainerResponseProto.Builder builder = null; + boolean viaProto = false; + + public SignalContainerResponsePBImpl() { + builder = SignalContainerResponseProto.newBuilder(); + } + + public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public boolean getIsCommandSubmitted() { + SignalContainerResponseProtoOrBuilder p = viaProto ? proto : builder; + return p.getIsCommandSubmitted(); + } + + @Override + public void setIsCommandSubmitted(boolean isKillCompleted) { + maybeInitBuilder(); + builder.setIsCommandSubmitted(isKillCompleted); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38dfa58..50b6412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -38,18 +39,22 @@ MasterKey getContainerTokenMasterKey(); void setContainerTokenMasterKey(MasterKey secretKey); - + MasterKey getNMTokenMasterKey(); void setNMTokenMasterKey(MasterKey secretKey); void addAllContainersToCleanup(List containers); - + void addAllApplicationsToCleanup(List applications); long getNextHeartBeatInterval(); void setNextHeartBeatInterval(long nextHeartBeatInterval); - + String getDiagnosticsMessage(); void setDiagnosticsMessage(String diagnosticsMessage); + + List getContainersToSignalList(); + void addAllContainersToSignal(List containers); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 775f95a..e99c66b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -21,7 +21,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Iterator; - +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -49,7 +51,8 @@ private List applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - + private List containersToSignal = null; + public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -81,6 +84,9 @@ private void mergeLocalToBuilder() { builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containersToSignal != null) { + addContainersToSignalToProto(); + } } private void mergeLocalToProto() { @@ -362,5 +368,75 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl) t).getProto(); } + + @Override + public List getContainersToSignalList() { + initContainersToSignal(); + return this.containersToSignal; + } + + private void initContainersToSignal() { + if (this.containersToSignal != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToSignalList(); + this.containersToSignal = new ArrayList(); + + for (SignalContainerRequestProto c : list) { + this.containersToSignal.add(convertFromProtoFormat(c)); + } + } + + @Override + public void addAllContainersToSignal( + final List containersToSignal) { + if (containersToSignal == null) + return; + initContainersToSignal(); + this.containersToSignal.addAll(containersToSignal); + } + + private void addContainersToSignalToProto() { + maybeInitBuilder(); + builder.clearContainersToSignal(); + if (containersToSignal == null) + return; + + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containersToSignal.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public SignalContainerRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToSignal(iterable); + } + + private SignalContainerRequestPBImpl convertFromProtoFormat( + SignalContainerRequestProto p) { + return new SignalContainerRequestPBImpl(p); + } + + private SignalContainerRequestProto convertToProtoFormat( + SignalContainerRequest t) { + return ((SignalContainerRequestPBImpl)t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java index 8bdff62..a28beb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -43,7 +44,8 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId, NodeAction action, List containersToCleanUp, List applicationsToCleanUp, MasterKey containerTokenMasterKey, MasterKey nmTokenMasterKey, - long nextHeartbeatInterval) { + long nextHeartbeatInterval, + List containersToSignal) { NodeHeartbeatResponse response = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); response.setResponseId(responseId); @@ -57,6 +59,9 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId, if(applicationsToCleanUp != null) { response.addAllApplicationsToCleanup(applicationsToCleanUp); } + if (containersToSignal != null) { + response.addAllContainersToSignal(containersToSignal); + } return response; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c544905..af9473b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -24,6 +24,7 @@ package hadoop.yarn; import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; +import "yarn_service_protos.proto"; message RegisterNodeManagerRequestProto { optional NodeIdProto node_id = 1; @@ -57,4 +58,5 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; + repeated SignalContainerRequestProto containers_to_signal = 9; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java new file mode 100644 index 0000000..cc967b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; + +public class CMgrSignalContainersEvent extends ContainerManagerEvent { + + private List containerToSignal; + + public CMgrSignalContainersEvent(List containerToSignal) { + super(ContainerManagerEventType.SIGNAL_CONTAINERS); + this.containerToSignal = containerToSignal; + } + + public List getContainersToSignal() { + return this.containerToSignal; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index ee72fbc..67559a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -35,8 +35,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; @@ -120,7 +120,7 @@ public abstract int launchContainer(Container container, List logDirs) throws IOException; public abstract boolean signalContainer(String user, String pid, - Signal signal) + SignalContainerCommand command) throws IOException; public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) @@ -147,15 +147,20 @@ public String toString() { /** * The constants for the signals. + * Applicable to POSIX-compliant operating systems */ public enum Signal { - NULL(0, "NULL"), QUIT(3, "SIGQUIT"), - KILL(9, "SIGKILL"), TERM(15, "SIGTERM"); - private final int value; + NULL(0, "NULL", SignalContainerCommand.NULL), + QUIT(3, "SIGQUIT", SignalContainerCommand.OUTPUT_THREAD_DUMP), + KILL(9, "SIGKILL", SignalContainerCommand.FORCEFUL_SHUTDOWN), + TERM(15, "SIGTERM", SignalContainerCommand.GRACEFUL_SHUTDOWN); + private final int value; // actual value used in the "kill" command private final String str; - private Signal(int value, String str) { + private final SignalContainerCommand command; // OS-independent command + private Signal(int value, String str, SignalContainerCommand command) { this.str = str; this.value = value; + this.command = command; } public int getValue() { return value; @@ -164,6 +169,17 @@ public int getValue() { public String toString() { return str; } + public static Signal valueOf(SignalContainerCommand command) { + if (command == QUIT.command) { + return QUIT; + } else if (command == KILL.command) { + return KILL; + } else if (command == TERM.command) { + return TERM; + } else { + return NULL; + } + } } protected void logOutput(String output) { @@ -297,16 +313,16 @@ public String getProcessId(ContainerId containerID) { private final String user; private final String pid; private final long delay; - private final Signal signal; + private final SignalContainerCommand command; private final ContainerExecutor containerExecutor; public DelayedProcessKiller(Container container, String user, String pid, - long delay, Signal signal, ContainerExecutor containerExecutor) { + long delay, SignalContainerCommand command, ContainerExecutor containerExecutor) { this.container = container; this.user = user; this.pid = pid; this.delay = delay; - this.signal = signal; + this.command = command; this.containerExecutor = containerExecutor; setName("Task killer for " + pid); setDaemon(false); @@ -315,7 +331,7 @@ public DelayedProcessKiller(Container container, String user, String pid, public void run() { try { Thread.sleep(delay); - containerExecutor.signalContainer(user, pid, signal); + containerExecutor.signalContainer(user, pid, command); } catch (InterruptedException e) { return; } catch (IOException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java index 4278ce0..8a7e423 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java @@ -21,4 +21,5 @@ public enum ContainerManagerEventType { FINISH_APPS, FINISH_CONTAINERS, + SIGNAL_CONTAINERS, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9e2e111..06fee41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -43,6 +43,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -312,15 +313,15 @@ public void writeLocalWrapperScript(Path launchDst, Path pidFile, } @Override - public boolean signalContainer(String user, String pid, Signal signal) - throws IOException { - LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid + public boolean signalContainer(String user, String pid, + SignalContainerCommand command) throws IOException { + LOG.debug("Sending signal " + command + " to pid " + pid + " as user " + user); if (!containerIsAlive(pid)) { return false; } try { - killContainer(pid, signal); + signalContainer(pid, Signal.valueOf(command)); } catch (IOException e) { if (!containerIsAlive(pid)) { return false; @@ -357,7 +358,7 @@ public static boolean containerIsAlive(String pid) throws IOException { * @param signal signal to send * (for logging). */ - private void killContainer(String pid, Signal signal) throws IOException { + private void signalContainer(String pid, Signal signal) throws IOException { new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) .execute(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index cbdcb13..ffd50af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; @@ -317,8 +318,8 @@ public int launchContainer(Container container, } @Override - public boolean signalContainer(String user, String pid, Signal signal) - throws IOException { + public boolean signalContainer(String user, String pid, + SignalContainerCommand signalCommand) throws IOException { verifyUsernamePattern(user); String runAsUser = getRunAsUser(user); @@ -329,7 +330,7 @@ public boolean signalContainer(String user, String pid, Signal signal) user, Integer.toString(Commands.SIGNAL_CONTAINER.getValue()), pid, - Integer.toString(signal.getValue()) }; + Integer.toString(Signal.valueOf(signalCommand).getValue()) }; ShellCommandExecutor shExec = new ShellCommandExecutor(command); if (LOG.isDebugEnabled()) { LOG.debug("signalContainer: " + Arrays.toString(command)); @@ -341,11 +342,11 @@ public boolean signalContainer(String user, String pid, Signal signal) if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) { return false; } - LOG.warn("Error in signalling container " + pid + " with " + signal + LOG.warn("Error in signalling container " + pid + " with " + signalCommand + "; exit = " + ret_code, e); logOutput(shExec.getOutput()); throw new IOException("Problem signalling container " + pid + " with " - + signal + "; output: " + shExec.getOutput() + " and exitCode: " + + signalCommand + "; output: " + shExec.getOutput() + " and exitCode: " + ret_code, e); } return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4db000c..a2c66dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -533,6 +534,18 @@ public void run() { new CMgrCompletedAppsEvent(appsToCleanup, CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); } + + // SignalContainer request comes from application's call to + // ApplicationClientProtocol's SignalContainer; RM forwards the request to NM after + // authorization check. NodeStatusUpdaterImpl forwards the request to ContainerManager. + // ContainerManager will dispatch the request to ContainerLauncher. + List containersToSignal = + response.getContainersToSignalList(); + if (containersToSignal.size() != 0) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } + } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3..a73411a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -83,6 +84,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -106,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; @@ -879,6 +882,23 @@ public void handle(ContainerManagerEvent event) { "Container Killed by ResourceManager")); } break; + case SIGNAL_CONTAINERS: + CMgrSignalContainersEvent containersSignalEvent = + (CMgrSignalContainersEvent) event; + for (SignalContainerRequest request : containersSignalEvent + .getContainersToSignal()) { + ContainerId containerId = request.getContainerId(); + Container container = this.context.getContainers().get(containerId); + if (container != null) { + LOG.info("Container " + containerId + " signal request by ResourceManager"); + this.dispatcher.getEventHandler().handle( + new SignalContainersLauncherEvent(container, + request.getCommand())); + } else { + LOG.info("Container " + containerId + " no longer exists"); + } + } + break; default: throw new YarnRuntimeException( "Got an unknown ContainerManagerEvent type: " + event.getType()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index e252e35..f92466a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -393,20 +394,21 @@ public void cleanupContainer() throws IOException { + " as user " + user + " for container " + containerIdStr); - final Signal signal = sleepDelayBeforeSigKill > 0 - ? Signal.TERM - : Signal.KILL; + final SignalContainerCommand command = sleepDelayBeforeSigKill > 0 + ? SignalContainerCommand.GRACEFUL_SHUTDOWN + : SignalContainerCommand.FORCEFUL_SHUTDOWN; - boolean result = exec.signalContainer(user, processId, signal); + boolean result = exec.signalContainer(user, processId, command); - LOG.debug("Sent signal " + signal + " to pid " + processId + LOG.debug("Sent signal " + command + " to pid " + processId + " as user " + user + " for container " + containerIdStr + ", result=" + (result? "success" : "failed")); if (sleepDelayBeforeSigKill > 0) { new DelayedProcessKiller(container, user, - processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start(); + processId, sleepDelayBeforeSigKill, + SignalContainerCommand.FORCEFUL_SHUTDOWN, exec).start(); } } } catch (Exception e) { @@ -426,6 +428,54 @@ public void cleanupContainer() throws IOException { } /** + * Send a signal to the container. + * @throws IOException + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void signalContainer(SignalContainerCommand command) throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = ConverterUtils.toString(containerId); + String user = container.getUser(); + + LOG.info("Sending signal " + command + " to container " + containerIdStr); + + boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + if (!alreadyLaunched) { + LOG.info("Container " + containerIdStr + " not launched." + + " Not sending the signal"); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting pid for container " + containerIdStr + + " to send signal to from pid file " + + (pidFilePath != null ? pidFilePath.toString() : "null")); + } + + try { + // get process id from pid file if available + // else if shell is still active, get it from the shell + String processId = null; + if (pidFilePath != null) { + processId = getContainerPid(pidFilePath); + } + + if (processId != null) { + LOG.info("Sending signal " + command + " to pid " + processId + + " as user " + user + + " for container " + containerIdStr); + exec.signalContainer(user, processId, command); + } + } catch (Exception e) { + String message = + "Exception when sending signal to container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.warn(message); + } + } + + + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. * @param pidFilePath File from which to read the process id diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index ce865e3..ddd8d3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -141,6 +141,23 @@ public void handle(ContainersLauncherEvent event) { + ". Ignoring."); } break; + case SIGNAL_CONTAINER: + SignalContainersLauncherEvent signalEvent = + (SignalContainersLauncherEvent) event; + ContainerLaunch runningContainer = running.get(containerId); + if (runningContainer == null) { + // Container not launched. So nothing needs to be done. + LOG.info("Container " + containerId + " not running, nothing to signal."); + return; + } + + try { + runningContainer.signalContainer(signalEvent.getCommand()); + } catch (IOException e) { + LOG.warn("Got exception while cleaning container " + containerId + + ". Ignoring."); + } + break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 6793bf7..92e1fcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -21,4 +21,5 @@ public enum ContainersLauncherEventType { LAUNCH_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. + SIGNAL_CONTAINER, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java new file mode 100644 index 0000000..ac11c73 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java @@ -0,0 +1,39 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +// This event can be triggered in one of the following ways +// WebUI -> Container +// CLI -> ApplicationClientProtocol -> NM HeartbeatResponse -> ContainerManager +public class SignalContainersLauncherEvent extends ContainersLauncherEvent{ + + private final SignalContainerCommand command; + public SignalContainersLauncherEvent( + Container container, SignalContainerCommand command) { + super(container, ContainersLauncherEventType.SIGNAL_CONTAINER); + this.command = command; + } + public SignalContainerCommand getCommand() { + return command; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index 3f4091c..91e0b3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -97,7 +97,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(heartBeatID, null, null, - null, null, null, 1000L); + null, null, null, 1000L, null); return nhResponse; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index a47e7f7..6d90dbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -189,6 +189,18 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception { super.testStartContainerFailureWithUnknownAuxService(); } + @Override + public void testContainerLaunchAndSignal() throws InterruptedException, + IOException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerLaunchAndSignal"); + super.testContainerLaunchAndSignal(); + } + private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index f840730..7dbf45e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -31,6 +31,7 @@ import java.io.PrintWriter; import java.util.HashMap; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -253,7 +254,8 @@ public void run() { assertNotNull(pid); LOG.info("Going to killing the process."); - exec.signalContainer(appSubmitter, pid, Signal.TERM); + exec.signalContainer(appSubmitter, pid, + SignalContainerCommand.GRACEFUL_SHUTDOWN); LOG.info("sleeping for 100ms to let the sleep be killed"); Thread.sleep(100); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java index ddffa27..98acd8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java @@ -34,6 +34,7 @@ import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -271,10 +272,11 @@ public void testContainerKill() throws IOException { String appSubmitter = "nobody"; String cmd = String.valueOf( LinuxContainerExecutor.Commands.SIGNAL_CONTAINER.getValue()); - ContainerExecutor.Signal signal = ContainerExecutor.Signal.QUIT; - String sigVal = String.valueOf(signal.getValue()); + SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP; + String sigVal = String.valueOf( + ContainerExecutor.Signal.valueOf(command).getValue()); - mockExec.signalContainer(appSubmitter, "1000", signal); + mockExec.signalContainer(appSubmitter, "1000", command); assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, appSubmitter, cmd, "1000", sigVal), readMockParams()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 5c2e085..44bcc28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -246,7 +246,7 @@ public NodeHeartbeatResponse nodeHeartbeat( // notify RESYNC on first heartbeat. return YarnServerBuilderUtils.newNodeHeartbeatResponse(1, - NodeAction.RESYNC, null, null, null, null, 1000L); + NodeAction.RESYNC, null, null, null, null, 1000L, null); } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 0ff34a5..6e87cd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -50,6 +50,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -147,9 +149,15 @@ public static MasterKey createMasterKey() { private class MyResourceTracker implements ResourceTracker { private final Context context; + private boolean signalContainer; public MyResourceTracker(Context context) { + this(context, false); + } + + public MyResourceTracker(Context context, boolean signalContainer) { this.context = context; + this.signalContainer = signalContainer; } @Override @@ -202,7 +210,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); - + List containersToSignal = null; + ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId2 = ApplicationId.newInstance(0, 2); @@ -240,6 +249,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.context.getContainers(); Assert.assertEquals(1, activeContainers.size()); + if (this.signalContainer) { + containersToSignal = new ArrayList(); + SignalContainerRequest signalReq = recordFactory + .newRecordInstance(SignalContainerRequest.class); + signalReq.setContainerId(appToContainers.get(appId1).get(0).getContainerId()); + signalReq.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP); + containersToSignal.add(signalReq); + } // Give another container to the NM. ApplicationAttemptId appAttemptID = ApplicationAttemptId.newInstance(appId2, 0); @@ -276,20 +293,45 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null, - 1000L); + 1000L, containersToSignal); return nhResponse; } } + private class MyContainerManager extends ContainerManagerImpl { + public boolean signaled = false; + + public MyContainerManager(Context context, ContainerExecutor exec, + DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, ApplicationACLsManager aclsManager, + LocalDirsHandlerService dirsHandler) { + super(context, exec, deletionContext, nodeStatusUpdater, + metrics, aclsManager, dirsHandler); + } + + @Override + public void handle(ContainerManagerEvent event) { + if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) { + signaled = true; + } + } + } + private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { public ResourceTracker resourceTracker; private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + this(context, dispatcher, healthChecker, metrics, false); + } + + public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + boolean signalContainer) { super(context, dispatcher, healthChecker, metrics); this.context = context; - resourceTracker = new MyResourceTracker(this.context); + this.resourceTracker = new MyResourceTracker(this.context, signalContainer); } @Override @@ -495,7 +537,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, - null, null, null, 1000L); + null, null, null, 1000L, null); nhResponse.setDiagnosticsMessage(shutDownMessage); return nhResponse; } @@ -534,7 +576,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, - null, null, null, 1000L); + null, null, null, 1000L, null); if (nodeStatus.getKeepAliveApplications() != null && nodeStatus.getKeepAliveApplications().size() > 0) { @@ -674,7 +716,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID); NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, - heartBeatNodeAction, null, null, null, null, 1000L); + heartBeatNodeAction, null, null, null, null, 1000L, null); return nhResponse; } } @@ -750,7 +792,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, - null, null, null, 1000L); + null, null, null, 1000L, null); return nhResponse; } } @@ -859,7 +901,65 @@ public void run() { nm.stop(); } - + + //Verify that signalContainer request can be dispatched from + //NodeStatusUpdaterImpl to ContainerManagerImpl. + @Test + public void testSignalContainerToContainerManager() throws Exception { + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new MyNodeStatusUpdater( + context, dispatcher, healthChecker, metrics, true); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, + ApplicationACLsManager aclsManager, + LocalDirsHandlerService diskhandler) { + return new MyContainerManager(context, exec, del, nodeStatusUpdater, + metrics, aclsManager, diskhandler); + } + + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + int waitCount = 0; + while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { + LOG.info("Waiting for NM to start.."); + if (nmStartError != null) { + LOG.error("Error during startup. ", nmStartError); + Assert.fail(nmStartError.getCause().getMessage()); + } + Thread.sleep(1000); + } + if (nm.getServiceState() != STATE.STARTED) { + // NM could have failed. + Assert.fail("NodeManager failed to start"); + } + + waitCount = 0; + while (heartBeatID <= 3 && waitCount++ != 20) { + Thread.sleep(500); + } + Assert.assertFalse(heartBeatID <= 3); + Assert.assertEquals("Number of registered NMs is wrong!!", 1, + this.registeredNodes.size()); + + MyContainerManager containerManager = + (MyContainerManager)nm.getContainerManager(); + Assert.assertTrue(containerManager.signaled); + + nm.stop(); + + } + @Test public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 8e4b0f3..18e8a6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.mockito.Mockito.spy; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -171,7 +173,7 @@ public void setup() throws IOException { delSrvc = createDeletionService(); delSrvc.init(conf); - exec = createContainerExecutor(); + exec = spy(createContainerExecutor()); nodeHealthChecker = new NodeHealthCheckerService(); nodeHealthChecker.init(conf); dirsHandler = nodeHealthChecker.getDiskHandler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 53a3c4b..b2550a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.verify; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -42,6 +46,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -59,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -68,6 +74,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -807,4 +814,96 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, containerTokenIdentifier); return containerToken; } + + // The test verifies after containerManager can process CMgrSignalContainersEvent properly + @Test + public void testContainerLaunchAndSignal() throws IOException, + InterruptedException, YarnException { + + containerManager.start(); + + File scriptFile = new File(tmpDir, "scriptFile.sh"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + File processStartFile = + new File(tmpDir, "start_file.txt").getAbsoluteFile(); + fileWriter.write("\numask 0"); // So that start file is readable by the test + fileWriter.write("\necho Hello World! > " + processStartFile); + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexec sleep 1000s"); + fileWriter.close(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + List commands = new ArrayList(); + commands.add("/bin/bash"); + commands.add(scriptFile.getAbsolutePath()); + containerLaunchContext.setCommands(commands); + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, context.getContainerTokenSecretManager())); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + + // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent + SignalContainerRequest signalContainerRequest = + recordFactory.newRecordInstance(SignalContainerRequest.class); + signalContainerRequest.setContainerId(cId); + signalContainerRequest.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP); + List reqs = new ArrayList(); + reqs.add(signalContainerRequest); + containerManager.handle(new CMgrSignalContainersEvent(reqs)); + + verify(exec).signalContainer(anyString(), anyString(), any(SignalContainerCommand.class)); + + List containerIds = new ArrayList(); + containerIds.add(cId); + StopContainersRequest stopRequest = + StopContainersRequest.newInstance(containerIds); + containerManager.stopContainers(stopRequest); + + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + ContainerStatus containerStatus = + containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), + containerStatus.getExitStatus()); + + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java index 1102ebb..9afc014 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -287,7 +287,7 @@ public void testContainerKillOnMemoryOverflow() throws IOException, // Assert that the process is not alive anymore Assert.assertFalse("Process is still alive!", exec.signalContainer(user, - pid, Signal.NULL)); + pid, SignalContainerCommand.NULL)); } @Test(timeout = 20000) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d40320..984f6be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -394,7 +394,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); + nextHeartBeatInterval, null); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse);