commit 70763450ec0d44262e8167bfa2dc270ecd8dd461 Author: Gera Shegalov Date: Sun May 18 23:37:34 2014 -0700 YARN-1515.v08.patch diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index f2c1841..b191ec2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -30,6 +30,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -404,6 +407,12 @@ public GetContainerStatusesResponse getContainerStatuses( } @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + return SignalContainerResponse.newInstance(null); + } + + @Override public StartContainersResponse startContainers(StartContainersRequest requests) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 6f21c87..907f2bc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; @@ -443,6 +445,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws IOException { return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + return null; + } } @SuppressWarnings("serial") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 7aa43df..40f6355 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -170,4 +172,7 @@ StopContainersResponse stopContainers(StopContainersRequest request) GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException, IOException; + + SignalContainerResponse signalContainer(SignalContainerRequest request) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java new file mode 100644 index 0000000..8f290c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalProto; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Public +@Unstable +public abstract class SignalContainerRequest { + + @Public + @Unstable + public static SignalContainerRequest newInstance(ContainerId containerId, + int pause, Signal... signals) { + return newInstance(containerId, pause, Arrays.asList(signals)); + } + + @Public + @Unstable + public static SignalContainerRequest newInstance(ContainerId containerId, + int pause, Iterable signals) { + final SignalContainerRequest request = + Records.newRecord(SignalContainerRequest.class); + request.setContainerId(containerId); + request.setPause(pause); + request.setSignals(signals); + return request; + } + + @Public + @Unstable + public abstract void setContainerId(ContainerId containerId); + + @Public + @Unstable + public abstract ContainerId getContainerId(); + + @Public + @Unstable + public abstract void setPause(int pause); + + @Public + @Unstable + public abstract int getPause(); + + @Public + @Unstable + public abstract void setSignals(Iterable signals); + + @Public + @Unstable + public abstract Iterable getSignals(); + + + /** + * The constants for the signals. + */ + public enum Signal { + NULL(SignalProto.NULL.getNumber(), SignalProto.NULL.name()), + QUIT(SignalProto.SIGQUIT.getNumber(), SignalProto.SIGQUIT.name()), + KILL(SignalProto.SIGKILL.getNumber(), SignalProto.SIGKILL.name()), + TERM(SignalProto.SIGTERM.getNumber(), SignalProto.SIGTERM.name()); + + private static final Map CACHE = + new ConcurrentHashMap(); + + private final int value; + private final String str; + private Signal(int value, String str) { + this.str = str; + this.value = value; + } + public int getValue() { + return value; + } + @Override + public String toString() { + return str; + } + + public static Signal valueOf(SignalProto signalProto) { + final int sigNum = signalProto.getNumber(); + Signal signal = CACHE.get(sigNum); + if (signal == null) { + switch (sigNum) { + case SignalProto.NULL_VALUE: + signal = NULL; + break; + case SignalProto.SIGQUIT_VALUE: + signal = QUIT; + break; + case SignalProto.SIGKILL_VALUE: + signal = KILL; + break; + case SignalProto.SIGTERM_VALUE: + signal = TERM; + break; + default: + throw new IllegalArgumentException(sigNum + ": Infeasible signal"); + } + CACHE.put(sigNum, signal); + } + return signal; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java new file mode 100644 index 0000000..9b31e0f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Unstable +public abstract class SignalContainerResponse { + + @Public + @Unstable + public static SignalContainerResponse newInstance( + Iterable exceptions) { + final SignalContainerResponse response = + Records.newRecord(SignalContainerResponse.class); + response.setExceptions(exceptions); + return response; + } + + public abstract void setExceptions(Iterable exceptions); + + public abstract Iterable getExceptions(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 7b1647b..d0e75bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -34,4 +34,5 @@ service ContainerManagementProtocolService { rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); + rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..a1f26bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -208,6 +208,22 @@ message StopContainerRequestProto { message StopContainerResponseProto { } +enum SignalProto { + NULL = 0; + SIGQUIT = 3; + SIGKILL = 9; + SIGTERM = 15; +} +message SignalContainerRequestProto { + optional ContainerIdProto container_id = 1; + repeated SignalProto signals = 2; + optional int32 pause = 3; +} + +message SignalContainerResponseProto { + repeated SerializedExceptionProto exception = 1; +} + message GetContainerStatusRequestProto { optional ContainerIdProto container_id = 1; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 15397e3..55b4425 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -28,16 +28,21 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; @@ -46,6 +51,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -128,4 +135,18 @@ public GetContainerStatusesResponse getContainerStatuses( return null; } } + + @Override + public SignalContainerResponse signalContainer(SignalContainerRequest request) + throws YarnException, IOException { + SignalContainerRequestProto requestProto = + ((SignalContainerRequestPBImpl) request).getProto(); + try { + return new SignalContainerResponsePBImpl( + proxy.signalContainer(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 2d33e69..c024b4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -24,10 +24,13 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; @@ -35,6 +38,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses( throw new ServiceException(e); } } + + @Override + public SignalContainerResponseProto signalContainer(RpcController arg0, + SignalContainerRequestProto proto) throws ServiceException { + final SignalContainerRequestPBImpl request = + new SignalContainerRequestPBImpl(proto); + try { + final SignalContainerResponse response = real.signalContainer(request); + return ((SignalContainerResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java new file mode 100644 index 0000000..f33f1653 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalProto; + +import java.util.ArrayList; +import java.util.List; + + +public class SignalContainerRequestPBImpl extends SignalContainerRequest { + private SignalContainerRequestProto proto = + SignalContainerRequestProto.getDefaultInstance(); + private SignalContainerRequestProto.Builder builder; + boolean viaProto; + + private ContainerId containerId; + + private Iterable signals; + + private int pause; + + public SignalContainerRequestPBImpl() { + builder = SignalContainerRequestProto.newBuilder(); + } + + public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.containerId = containerId; + } + + @Override + public ContainerId getContainerId() { + if (containerId == null) { + final SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + containerId = new ContainerIdPBImpl(p.getContainerId()); + } + return containerId; + } + + @Override + public void setPause(int pause) { + maybeInitBuilder(); + if (pause == 0) { + builder.clearPause(); + } + this.pause = pause; + } + + @Override + public int getPause() { + if (pause == 0) { + final SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + pause = p.getPause(); + } + return pause; + } + + @Override + public void setSignals(Iterable signals) { + maybeInitBuilder(); + if (signals == null) { + builder.clearSignals(); + } + this.signals = signals; + } + + @Override + public Iterable getSignals() { + if (signals == null) { + final SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + final List sigList = new ArrayList(p.getSignalsCount()); + for (SignalProto sigProto : p.getSignalsList()) { + sigList.add(Signal.valueOf(sigProto)); + } + signals = sigList; + } + return signals; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + maybeInitBuilder(); + builder.clearContainerId(); + if (containerId != null) { + builder.setContainerId(((ContainerIdPBImpl)containerId).getProto()); + } + builder.clearSignals(); + if (signals != null) { + for (Signal signal : signals) { + builder.addSignals(SignalProto.valueOf(signal.getValue())); + } + } + builder.clearPause(); + if (pause != 0) { + builder.setPause(pause); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java new file mode 100644 index 0000000..d7462ed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProtoOrBuilder; + +import java.util.ArrayList; +import java.util.List; + + +public class SignalContainerResponsePBImpl extends SignalContainerResponse { + private SignalContainerResponseProto proto = + SignalContainerResponseProto.getDefaultInstance(); + private SignalContainerResponseProto.Builder builder; + boolean viaProto; + + private Iterable exceptions; + + public SignalContainerResponsePBImpl() { + builder = SignalContainerResponseProto.newBuilder(); + } + + public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public void setExceptions(Iterable exceptions) { + maybeInitBuilder(); + if (exceptions == null) { + builder.clearException(); + } + this.exceptions = exceptions; + } + + @Override + public Iterable getExceptions() { + if (exceptions == null) { + final SignalContainerResponseProtoOrBuilder p = + viaProto ? proto : builder; + final List exList = + new ArrayList(p.getExceptionCount()); + for (SerializedExceptionProto sep : p.getExceptionList()) { + exList.add(new SerializedExceptionPBImpl(sep)); + } + exceptions = exList; + } + return exceptions; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + maybeInitBuilder(); + builder.clearException(); + if (exceptions != null) { + for (SerializedException e : exceptions) { + builder.addException(((SerializedExceptionPBImpl)e).getProto()); + } + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index b15862f..cd71978 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -165,5 +167,13 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesResponse.newInstance(list, null); return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + final Exception e = new Exception("Dummy function", new Exception( + "Dummy function cause")); + throw new YarnException(e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index fc87443..b9f3878 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -218,6 +220,14 @@ public StopContainersResponse stopContainers(StopContainersRequest request) new Exception(EXCEPTION_CAUSE)); throw new YarnException(e); } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException { + final Exception e = new Exception(EXCEPTION_MSG, + new Exception(EXCEPTION_CAUSE)); + throw new YarnException(e); + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index ee72fbc..63da78d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -35,14 +35,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.StringUtils; public abstract class ContainerExecutor implements Configurable { @@ -120,8 +118,7 @@ public abstract int launchContainer(Container container, List logDirs) throws IOException; public abstract boolean signalContainer(String user, String pid, - Signal signal) - throws IOException; + Signal signal) throws IOException; public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) throws IOException, InterruptedException; @@ -145,27 +142,6 @@ public String toString() { } } - /** - * The constants for the signals. - */ - public enum Signal { - NULL(0, "NULL"), QUIT(3, "SIGQUIT"), - KILL(9, "SIGKILL"), TERM(15, "SIGTERM"); - private final int value; - private final String str; - private Signal(int value, String str) { - this.str = str; - this.value = value; - } - public int getValue() { - return value; - } - @Override - public String toString() { - return str; - } - } - protected void logOutput(String output) { String shExecOutput = output; if (shExecOutput != null) { @@ -291,41 +267,4 @@ public String getProcessId(ContainerId containerID) { } return pid; } - - public static class DelayedProcessKiller extends Thread { - private Container container; - private final String user; - private final String pid; - private final long delay; - private final Signal signal; - private final ContainerExecutor containerExecutor; - - public DelayedProcessKiller(Container container, String user, String pid, - long delay, Signal signal, ContainerExecutor containerExecutor) { - this.container = container; - this.user = user; - this.pid = pid; - this.delay = delay; - this.signal = signal; - this.containerExecutor = containerExecutor; - setName("Task killer for " + pid); - setDaemon(false); - } - @Override - public void run() { - try { - Thread.sleep(delay); - containerExecutor.signalContainer(user, pid, signal); - } catch (InterruptedException e) { - return; - } catch (IOException e) { - String message = "Exception when user " + user + " killing task " + pid - + " in DelayedProcessKiller: " + StringUtils.stringifyException(e); - LOG.warn(message); - container.handle(new ContainerDiagnosticsUpdateEvent(container - .getContainerId(), message)); - } - } - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9e2e111..7a01f71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -42,6 +42,7 @@ import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index cbdcb13..26196fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -36,6 +36,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index dd3deb3..5649161 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -84,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -103,7 +106,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerSignalEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -698,9 +701,14 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) new HashMap(); UserGroupInformation remoteUgi = getRemoteUgi(); NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi); + final int pauseMillis = (int)getConfig().getLong( + YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); for (ContainerId id : requests.getContainerIds()) { try { - stopContainerInternal(identifier, id); + final SignalContainerRequest request = SignalContainerRequest + .newInstance(id, pauseMillis, Signal.TERM, Signal.KILL); + signalContainerInternal(identifier, request); succeededRequests.add(id); } catch (YarnException e) { failedRequests.put(id, SerializedException.newInstance(e)); @@ -710,14 +718,24 @@ public StopContainersResponse stopContainers(StopContainersRequest requests) .newInstance(succeededRequests, failedRequests); } + @Override + public SignalContainerResponse signalContainer(SignalContainerRequest request) + throws YarnException, IOException { + final NMTokenIdentifier tokenIdentifier = + selectNMTokenIdentifier(getRemoteUgi()); + signalContainerInternal(tokenIdentifier, request); + return SignalContainerResponse.newInstance(null); + } + @SuppressWarnings("unchecked") - private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, - ContainerId containerID) throws YarnException { + private void signalContainerInternal(NMTokenIdentifier nmTokenIdentifier, + SignalContainerRequest request) throws YarnException { + final ContainerId containerID = request.getContainerId(); String containerIDStr = containerID.toString(); Container container = this.context.getContainers().get(containerID); LOG.info("Stopping container with container Id: " + containerIDStr); authorizeGetAndStopContainerRequest(containerID, container, true, - nmTokenIdentifier); + nmTokenIdentifier); if (container == null) { if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { @@ -725,9 +743,8 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, + " is not handled by this NodeManager"); } } else { - dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed by the ApplicationMaster.")); + dispatcher.getEventHandler().handle(ContainerSignalEvent + .getInstance(request, "Container killed by the ApplicationMaster.")); NMAuditLogger.logSuccess(container.getUser(), AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID @@ -872,10 +889,13 @@ public void handle(ContainerManagerEvent event) { case FINISH_CONTAINERS: CMgrCompletedContainersEvent containersFinishedEvent = (CMgrCompletedContainersEvent) event; + final int pauseMillis = (int)getConfig().getLong( + YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); for (ContainerId container : containersFinishedEvent .getContainersToCleanup()) { this.dispatcher.getEventHandler().handle( - new ContainerKillEvent(container, + ContainerSignalEvent.getInstance(container, pauseMillis, "Container Killed by ResourceManager")); } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 21d2f91..83d07f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -38,7 +39,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerSignalEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -374,8 +375,10 @@ public ApplicationState transition(ApplicationImpl app, // application. for (ContainerId containerID : app.containers.keySet()) { app.dispatcher.getEventHandler().handle( - new ContainerKillEvent(containerID, - "Container killed on application-finish event: " + appEvent.getDiagnostic())); + ContainerSignalEvent.getInstance(containerID, + (int)YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + "Container killed on application-finish event: " + + appEvent.getDiagnostic())); } return ApplicationState.FINISHING_CONTAINERS_WAIT; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 5622f8c..ab976ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -23,6 +23,7 @@ // Producer: ContainerManager INIT_CONTAINER, KILL_CONTAINER, + SIGNAL_CONTAINER, UPDATE_DIAGNOSTICS_MSG, CONTAINER_DONE, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 50653f5..d9a4029 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerRemoteCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; @@ -128,6 +129,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = new ContainerDiagnosticsUpdateTransition(); + private static final KillTransition KILL_TRANSITION = new KillTransition(); + // State Machine for each container. private static StateMachineFactory @@ -192,7 +195,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, - ContainerEventType.KILL_CONTAINER, new KillTransition()) + ContainerEventType.KILL_CONTAINER, KILL_TRANSITION) // From RUNNING State .addTransition(ContainerState.RUNNING, @@ -207,7 +210,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, - ContainerEventType.KILL_CONTAINER, new KillTransition()) + ContainerEventType.KILL_CONTAINER, KILL_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) @@ -668,8 +671,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { if (clCleanupRequired) { container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); + new ContainerRemoteCleanupEvent(container)); } container.cleanup(); @@ -703,8 +705,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { if (clCleanupRequired) { container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); + new ContainerRemoteCleanupEvent(container)); } container.cleanup(); @@ -760,7 +761,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { // resources. container.cleanup(); container.metrics.endInitingContainer(); - ContainerKillEvent killEvent = (ContainerKillEvent) event; + ContainerSignalEvent killEvent = (ContainerSignalEvent)event; container.exitCode = ExitCode.TERMINATED.getExitCode(); container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); container.diagnostics.append("Container is killed before being launched.\n"); @@ -799,11 +800,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { SingleArcTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { + final ContainerSignalEvent killEvent = (ContainerSignalEvent)event; // Kill the process/process-grp container.dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(container, - ContainersLauncherEventType.CLEANUP_CONTAINER)); - ContainerKillEvent killEvent = (ContainerKillEvent) event; + new ContainerRemoteCleanupEvent(container, killEvent)); container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); } } @@ -858,7 +858,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { static class KillOnNewTransition extends ContainerDoneTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { - ContainerKillEvent killEvent = (ContainerKillEvent) event; + ContainerSignalEvent killEvent = (ContainerSignalEvent)event; container.exitCode = ExitCode.TERMINATED.getExitCode(); container.diagnostics.append(killEvent.getDiagnostic()).append("\n"); container.diagnostics.append("Container is killed before being launched.\n"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java deleted file mode 100644 index 313b6a8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; - -import org.apache.hadoop.yarn.api.records.ContainerId; - -public class ContainerKillEvent extends ContainerEvent { - - private final String diagnostic; - - public ContainerKillEvent(ContainerId cID, String diagnostic) { - super(cID, ContainerEventType.KILL_CONTAINER); - this.diagnostic = diagnostic; - } - - public String getDiagnostic() { - return this.diagnostic; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerSignalEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerSignalEvent.java new file mode 100644 index 0000000..f38112e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerSignalEvent.java @@ -0,0 +1,67 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ContainerSignalEvent extends ContainerEvent { + private final SignalContainerRequest signalRequest; + private final String diagnostic; + + private ContainerSignalEvent(ContainerEventType type, + SignalContainerRequest signalRequest, + String diagnostic) { + super(signalRequest.getContainerId(), type); + this.signalRequest = signalRequest; + this.diagnostic = diagnostic; + } + + public String getDiagnostic() { + return this.diagnostic; + } + + public SignalContainerRequest getSignalRequest() { + return signalRequest; + } + + public static ContainerSignalEvent getInstance(ContainerId containerId, + int pauseMillis, String msg) { + return new ContainerSignalEvent(ContainerEventType.KILL_CONTAINER, + SignalContainerRequest.newInstance(containerId, pauseMillis, + Signal.TERM, Signal.KILL), msg); + } + + public static ContainerSignalEvent getInstance(SignalContainerRequest request, + String msg) { + boolean impliesKill = false; + for (Signal signal : request.getSignals()) { + if (signal == Signal.KILL || signal == Signal.TERM) { + impliesKill = true; + break; + } + } + final ContainerEventType type = impliesKill + ? ContainerEventType.KILL_CONTAINER + : ContainerEventType.SIGNAL_CONTAINER; + return new ContainerSignalEvent(type, request, msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index e252e35..3909149 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -49,6 +50,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -56,9 +58,8 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerSignalEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -99,7 +101,6 @@ private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); private volatile AtomicBoolean completed = new AtomicBoolean(false); - private long sleepDelayBeforeSigKill = 250; private long maxKillWaitTime = 2000; private Path pidFilePath = null; @@ -118,9 +119,6 @@ public ContainerLaunch(Context context, Configuration configuration, this.dispatcher = dispatcher; this.dirsHandler = dirsHandler; this.containerManager = containerManager; - this.sleepDelayBeforeSigKill = - conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, - YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); this.maxKillWaitTime = conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS, YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS); @@ -352,19 +350,18 @@ public Integer call() { * @throws IOException */ @SuppressWarnings("unchecked") // dispatcher not typed - public void cleanupContainer() throws IOException { + public void signalContainer(ContainerSignalEvent killEvent) throws IOException { ContainerId containerId = container.getContainerId(); String containerIdStr = ConverterUtils.toString(containerId); - LOG.info("Cleaning up container " + containerIdStr); + LOG.info("Signaling container " + containerIdStr); // launch flag will be set to true if process already launched boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); if (!alreadyLaunched) { LOG.info("Container " + containerIdStr + " not launched." - + " No cleanup needed to be done"); + + " No signaling needed to be done"); return; } - LOG.debug("Marking container " + containerIdStr + " as inactive"); // this should ensure that if the container process has not launched // by this time, it will never be launched @@ -393,20 +390,22 @@ public void cleanupContainer() throws IOException { + " as user " + user + " for container " + containerIdStr); - final Signal signal = sleepDelayBeforeSigKill > 0 - ? Signal.TERM - : Signal.KILL; - - boolean result = exec.signalContainer(user, processId, signal); - - LOG.debug("Sent signal " + signal + " to pid " + processId - + " as user " + user - + " for container " + containerIdStr - + ", result=" + (result? "success" : "failed")); - - if (sleepDelayBeforeSigKill > 0) { - new DelayedProcessKiller(container, user, - processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start(); + final SignalContainerRequest signalRequest = + killEvent.getSignalRequest(); + final int pauseMillis = signalRequest.getPause(); + final Iterator sigIter = signalRequest.getSignals().iterator(); + while (sigIter.hasNext()) { + final Signal signal = sigIter.next(); + boolean result = exec.signalContainer(user, processId, signal); + LOG.debug("Sent signal " + signal + " to pid " + processId + + " as user " + user + + " for container " + containerIdStr + + ", result=" + (result? "success" : "failed")); + if (sigIter.hasNext()) { + try { + Thread.sleep(pauseMillis); + } catch (InterruptedException ignored) {} + } } } } catch (Exception e) { @@ -417,10 +416,12 @@ public void cleanupContainer() throws IOException { dispatcher.getEventHandler().handle( new ContainerDiagnosticsUpdateEvent(containerId, message)); } finally { - // cleanup pid file if present - if (pidFilePath != null) { - FileContext lfs = FileContext.getLocalFSFileContext(); - lfs.delete(pidFilePath, false); + if (killEvent.getType() == ContainerEventType.KILL_CONTAINER) { + // cleanup pid file if present + if (pidFilePath != null) { + FileContext lfs = FileContext.getLocalFSFileContext(); + lfs.delete(pidFilePath, false); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerRemoteCleanupEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerRemoteCleanupEvent.java new file mode 100644 index 0000000..ff38a13 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerRemoteCleanupEvent.java @@ -0,0 +1,44 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerSignalEvent; + +public class ContainerRemoteCleanupEvent extends ContainersLauncherEvent { + private final ContainerSignalEvent killEvent; + + public ContainerRemoteCleanupEvent(Container container) { + this(container, + ContainerSignalEvent.getInstance(container.getContainerId(), + (int)YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + "")); + } + + public ContainerRemoteCleanupEvent(Container container, + ContainerSignalEvent killEvent) { + super(container, ContainersLauncherEventType.CLEANUP_CONTAINER); + this.killEvent = killEvent; + } + + public ContainerSignalEvent getKillEvent() { + return killEvent; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index ce865e3..f6e123b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,17 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerSignalEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -135,7 +130,9 @@ public void handle(ContainersLauncherEvent event) { // Cleanup a container whether it is running/killed/completed, so that // no sub-processes are alive. try { - launcher.cleanupContainer(); + final ContainerSignalEvent killEvent = + ((ContainerRemoteCleanupEvent)event).getKillEvent(); + launcher.signalContainer(killEvent); } catch (IOException e) { LOG.warn("Got exception while cleaning container " + containerId + ". Ignoring."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java index 38bedf2..e32ac37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; public class ContainersLauncherEvent diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b681b34..8af9aaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerSignalEvent; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; @@ -315,6 +315,10 @@ public MonitoringThread() { @Override public void run() { + final int pauseMillis = + (int)conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); + while (true) { // Print the processTrees for debugging. @@ -439,8 +443,8 @@ public void run() { + " but it is not a process group leader."); } // kill the container - eventDispatcher.getEventHandler().handle( - new ContainerKillEvent(containerId, msg)); + eventDispatcher.getEventHandler().handle(ContainerSignalEvent + .getInstance(containerId, pauseMillis, msg)); it.remove(); LOG.info("Removed ProcessTree with root " + pId); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index f840730..7a6f3b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -44,8 +44,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.junit.After; import org.junit.Before; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java index ddffa27..1ef8e80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java @@ -44,6 +44,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.junit.After; @@ -271,7 +272,7 @@ public void testContainerKill() throws IOException { String appSubmitter = "nobody"; String cmd = String.valueOf( LinuxContainerExecutor.Commands.SIGNAL_CONTAINER.getValue()); - ContainerExecutor.Signal signal = ContainerExecutor.Signal.QUIT; + Signal signal = Signal.QUIT; String sigVal = String.valueOf(signal.getValue()); mockExec.signalContainer(appSubmitter, "1000", signal); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 73f98cb..e8b40c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerSignalEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; @@ -445,8 +445,8 @@ public ContainerKillMatcher(ContainerId cId) { @Override public boolean matches(Object argument) { - if (argument instanceof ContainerKillEvent) { - ContainerKillEvent event = (ContainerKillEvent) argument; + if (argument instanceof ContainerSignalEvent) { + ContainerSignalEvent event = (ContainerSignalEvent)argument; return event.getContainerID().equals(cId); } return false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 8af9518..b099172 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -898,7 +898,11 @@ public void containerFailed(int exitCode) { } public void killContainer() { - c.handle(new ContainerKillEvent(cId, "KillRequest")); + final int pauseMillis = (int)conf.getLong( + YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); + c.handle( + ContainerSignalEvent.getInstance(cId, pauseMillis, "KillRequest")); drainDispatcherEvents(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index c8fc85a..f7b50c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -259,7 +261,7 @@ public void testInvalidEnvSyntaxDiagnostics() throws IOException { env.put( "APPLICATION_WORKFLOW_CONTEXT", "{\"workflowId\":\"609f91c5cd83\"," + "\"workflowName\":\"\n\ninsert table " + - "\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " ); + "\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, "); List commands = new ArrayList(); ContainerLaunch.writeLaunchEnv(fos, env, resources, commands); fos.flush(); @@ -623,6 +625,52 @@ public void testAuxiliaryServiceHelper() throws Exception { AuxiliaryServiceHelper.getServiceDataFromEnv(serviceName, env)); } + private void startKillTestContainer(ContainerId cId, File processStartFile, + File scriptFile) throws Exception { + FileUtil.setExecutable(scriptFile, true); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // upload the script file so that the container can run it + URL resource_alpha = ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file.sh"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + + // set up the rest of the container + List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + Token containerToken = createContainerToken(cId); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + containerToken); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + } + private void internalKillTest(boolean delayed) throws Exception { conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, delayed ? 1000 : 0); @@ -655,50 +703,7 @@ private void internalKillTest(boolean delayed) throws Exception { writer.println("while true; do\nsleep 1s;\ndone"); } writer.close(); - FileUtil.setExecutable(scriptFile, true); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - // upload the script file so that the container can run it - URL resource_alpha = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = - recordFactory.newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file.sh"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, rsrc_alpha); - containerLaunchContext.setLocalResources(localResources); - - // set up the rest of the container - List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - containerLaunchContext.setCommands(commands); - Token containerToken = createContainerToken(cId); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - containerToken); - List list = new ArrayList(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - int timeoutSecs = 0; - while (!processStartFile.exists() && timeoutSecs++ < 20) { - Thread.sleep(1000); - LOG.info("Waiting for process start-file to be created"); - } - Assert.assertTrue("ProcessStartFile doesn't exist!", - processStartFile.exists()); - + startKillTestContainer(cId, processStartFile, scriptFile); // Now test the stop functionality. List containerIds = new ArrayList(); containerIds.add(cId); @@ -748,6 +753,82 @@ private void internalKillTest(boolean delayed) throws Exception { } } + public static class SleepTask { + public static void main(String[] args) throws Throwable { + if (args.length != 1) { + System.exit(1); + } + final int sleepTime = Integer.valueOf(args[0]); + System.out.printf("Sleeping for %d milliseconds\n", sleepTime); + Thread.sleep(sleepTime); + System.out.println("Exiting after sleeping"); + } + } + + @Test (timeout = 30000) + public void testThreadDumpKill() throws Exception { + containerManager.start(); + + File processStartFile = new File(tmpDir, "pid.txt").getAbsoluteFile(); + + // setup a script that can handle sigterm gracefully + File scriptFile = new File(tmpDir, "testscript.sh"); + PrintWriter writer = new PrintWriter( + new FileOutputStream(scriptFile)); + writer.println("#!/bin/bash\n\n"); + writer.println("exec $JAVA_HOME/bin/java -classpath " + + System.getProperty("java.class.path") + " \'" + + SleepTask.class.getName() + "\' 600000 >> " + processStartFile); + writer.close(); + FileUtil.setExecutable(scriptFile, true); + + // ////// Construct the Container-id + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId. + newInstance(appId, 1); + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + + startKillTestContainer(cId, processStartFile, scriptFile); + + // Now test the stop functionality. + List containerIds = new ArrayList(); + containerIds.add(cId); + containerManager.signalContainer(SignalContainerRequest.newInstance(cId, + 250, Signal.QUIT, Signal.TERM, Signal.KILL)); + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + // if delayed container stop sends a sigterm followed by a sigkill + // otherwise sigkill is sent immediately + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + + ContainerStatus containerStatus = + containerManager.getContainerStatuses(gcsRequest) + .getContainerStatuses().get(0); + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), + containerStatus.getExitStatus()); + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(cId.toString())); + + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + + boolean found = false; + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (line.contains("Full thread dump")) { + found = true; + break; + } + } + Assert.assertTrue("Did not find \"Full thread dump\"", found); + reader.close(); + } + @Test (timeout = 30000) public void testDelayedKill() throws Exception { internalKillTest(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java index 1102ebb..c7d92f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.utils.BuilderUtils; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a1c1a40..cb365e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -39,17 +39,20 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -72,23 +75,20 @@ private ContainerManagementProtocol containerMgrProxy; - private final RMAppAttempt application; private final Configuration conf; - private final AMLauncherEventType eventType; + private final AMLauncherEvent event; private final RMContext rmContext; private final Container masterContainer; - + @SuppressWarnings("rawtypes") private final EventHandler handler; - public AMLauncher(RMContext rmContext, RMAppAttempt application, - AMLauncherEventType eventType, Configuration conf) { - this.application = application; + public AMLauncher(RMContext rmContext, AMLauncherEvent event, Configuration conf) { this.conf = conf; - this.eventType = eventType; + this.event = event; this.rmContext = rmContext; this.handler = rmContext.getDispatcher().getEventHandler(); - this.masterContainer = application.getMasterContainer(); + this.masterContainer = event.getAppAttempt().getMasterContainer(); } private void connect() throws IOException { @@ -99,6 +99,7 @@ private void connect() throws IOException { private void launch() throws IOException, YarnException { connect(); + final RMAppAttempt application = event.getAppAttempt(); ContainerId masterContainerID = masterContainer.getId(); ApplicationSubmissionContext applicationContext = application.getSubmissionContext(); @@ -128,19 +129,19 @@ private void launch() throws IOException, YarnException { } } - private void cleanup() throws IOException, YarnException { + private void cleanup(Iterable signals) + throws IOException, YarnException { connect(); ContainerId containerId = masterContainer.getId(); - List containerIds = new ArrayList(); - containerIds.add(containerId); - StopContainersRequest stopRequest = - StopContainersRequest.newInstance(containerIds); - StopContainersResponse response = - containerMgrProxy.stopContainers(stopRequest); - if (response.getFailedRequests() != null - && response.getFailedRequests().containsKey(containerId)) { - Throwable t = response.getFailedRequests().get(containerId).deSerialize(); - parseAndThrowException(t); + SignalContainerRequest signalRequest = SignalContainerRequest.newInstance( + containerId, + (int)conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS), + signals); + final SignalContainerResponse response = + containerMgrProxy.signalContainer(signalRequest); + for (SerializedException e : response.getExceptions()) { + parseAndThrowException(e.deSerialize()); } } @@ -148,7 +149,7 @@ private void cleanup() throws IOException, YarnException { protected ContainerManagementProtocol getContainerMgrProxy( final ContainerId containerId) { - final NodeId node = masterContainer.getNodeId(); + final NodeId node = event.getAppAttempt().getMasterContainer().getNodeId(); final InetSocketAddress containerManagerBindAddress = NetUtils.createSocketAddrForHost(node.getHost(), node.getPort()); @@ -202,6 +203,7 @@ private ContainerLaunchContext createAMContainerLaunchContext( private void setupTokens( ContainerLaunchContext container, ContainerId containerID) throws IOException { + final RMAppAttempt application = event.getAppAttempt(); Map environment = container.getEnvironment(); environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV, application.getWebProxyBase()); @@ -237,12 +239,13 @@ private void setupTokens( @VisibleForTesting protected Token getAMRMToken() { - return application.getAMRMToken(); + return event.getAppAttempt().getAMRMToken(); } @SuppressWarnings("unchecked") public void run() { - switch (eventType) { + final RMAppAttempt application = event.getAppAttempt(); + switch (event.getType()) { case LAUNCH: try { LOG.info("Launching master" + application.getAppAttemptId()); @@ -260,7 +263,7 @@ public void run() { case CLEANUP: try { LOG.info("Cleaning master " + application.getAppAttemptId()); - cleanup(); + cleanup(((AMLauncherCleanupEvent)event).getSignals()); } catch(IOException ie) { LOG.info("Error cleaning master ", ie); } catch (YarnException e) { @@ -274,7 +277,7 @@ public void run() { } break; default: - LOG.warn("Received unknown event-type " + eventType + ". Ignoring."); + LOG.warn("Received unknown event-type " + event.getType() + ". Ignoring."); break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherCleanupEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherCleanupEvent.java new file mode 100644 index 0000000..0395c44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncherCleanupEvent.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.amlauncher; + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest.Signal; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; + +import java.util.Arrays; + +public class AMLauncherCleanupEvent extends AMLauncherEvent { + private final Iterable signals; + + public AMLauncherCleanupEvent(RMAppAttempt appAttempt) { + this(appAttempt, false); + } + + public AMLauncherCleanupEvent(RMAppAttempt appAttempt, boolean stuck) { + super(AMLauncherEventType.CLEANUP, appAttempt); + signals = stuck + ? Arrays.asList(Signal.QUIT, Signal.TERM, Signal.KILL) + : Arrays.asList(Signal.TERM, Signal.KILL); + } + + public Iterable getSignals() { + return signals; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java index af02b19..17e616c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java @@ -56,16 +56,15 @@ protected void serviceStart() throws Exception { super.serviceStart(); } - protected Runnable createRunnableLauncher(RMAppAttempt application, - AMLauncherEventType event) { + protected Runnable createRunnableLauncher(AMLauncherEvent event) { Runnable launcher = - new AMLauncher(context, application, event, getConfig()); + new AMLauncher(context, event, getConfig()); return launcher; } private void launch(RMAppAttempt application) { - Runnable launcher = createRunnableLauncher(application, - AMLauncherEventType.LAUNCH); + Runnable launcher = createRunnableLauncher( + new AMLauncherEvent(AMLauncherEventType.LAUNCH, application)); masterEvents.add(launcher); } @@ -102,8 +101,8 @@ public void run() { } } - private void cleanup(RMAppAttempt application) { - Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP); + private void cleanup(AMLauncherEvent event) { + Runnable launcher = createRunnableLauncher(event); masterEvents.add(launcher); } @@ -116,7 +115,7 @@ public synchronized void handle(AMLauncherEvent appEvent) { launch(application); break; case CLEANUP: - cleanup(application); + cleanup(appEvent); default: break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e289ad5..d94a7b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherCleanupEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -1172,8 +1173,7 @@ public void transition(RMAppAttemptImpl appAttempt, super.transition(appAttempt, event); // Tell the launcher to cleanup. - appAttempt.eventHandler.handle(new AMLauncherEvent( - AMLauncherEventType.CLEANUP, appAttempt)); + appAttempt.eventHandler.handle(new AMLauncherCleanupEvent(appAttempt)); } } @@ -1268,8 +1268,8 @@ public void transition(RMAppAttemptImpl appAttempt, if(!appAttempt.submissionContext.getUnmanagedAM()) { // Tell the launcher to cleanup. - appAttempt.eventHandler.handle(new AMLauncherEvent( - AMLauncherEventType.CLEANUP, appAttempt)); + appAttempt.eventHandler.handle(new AMLauncherCleanupEvent(appAttempt, + this instanceof ExpiredTransition)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java index 0ea2b5e..a80a34d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java @@ -28,9 +28,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; public class MockRMWithCustomAMLauncher extends MockRM { @@ -50,9 +49,8 @@ public MockRMWithCustomAMLauncher(Configuration conf, protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(getRMContext()) { @Override - protected Runnable createRunnableLauncher(RMAppAttempt application, - AMLauncherEventType event) { - return new AMLauncher(context, application, event, getConfig()) { + protected Runnable createRunnableLauncher(AMLauncherEvent event) { + return new AMLauncher(context, event, getConfig()) { @Override protected ContainerManagementProtocol getContainerMgrProxy( ContainerId containerId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index d8d474e..f814c80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -274,6 +276,12 @@ synchronized public StopContainersResponse stopContainers(StopContainersRequest } @Override + public synchronized SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + throw new YarnException("Not supported yet!"); + } + + @Override synchronized public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { List statuses = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index c7f0d0a..43b7aea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; @@ -122,6 +124,12 @@ public GetContainerStatusesResponse getContainerStatuses( return GetContainerStatusesResponse.newInstance(null, null); } + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + return SignalContainerResponse.newInstance(null); + } + public Credentials getContainerCredentials() throws IOException { Credentials credentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..b57565a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -118,6 +120,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + return null; + } } @Test