diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java index 8013f22..6cc7751 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java @@ -177,8 +177,20 @@ static private OSType getOSType() { /** Return a command to send a signal to a given pid */ public static String[] getSignalKillCommand(int code, String pid) { - return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } : - new String[] { "kill", "-" + code, isSetsidAvailable ? "-" + pid : pid }; + if (Shell.WINDOWS) { + final String cmd; + if (code == 3) { // QUIT + cmd = "sendBreak"; + } else if (code == 15) { // TERM + cmd = "sendCtrlC"; + } else { + cmd = "kill"; + } + return new String[] { Shell.WINUTILS, "task", cmd , pid }; + } else { + return new String[] + { "kill", "-" + code, isSetsidAvailable ? "-" + pid : pid }; + } } /** Return a regular expression string that match environment variables */ diff --git hadoop-common-project/hadoop-common/src/main/winutils/task.c hadoop-common-project/hadoop-common/src/main/winutils/task.c index 19bda96..7e61a62 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/task.c +++ hadoop-common-project/hadoop-common/src/main/winutils/task.c @@ -36,6 +36,8 @@ typedef enum TaskCommandOptionType TaskCreate, TaskIsAlive, TaskKill, + TaskCtrlC, + TaskBreak, TaskProcessList } TaskCommandOption; @@ -71,6 +73,16 @@ static BOOL ParseCommandLine(__in int argc, *command = TaskKill; return TRUE; } + if (wcscmp(argv[1], L"sendCtrlC") == 0) + { + *command = TaskCtrlC; + return TRUE; + } + if (wcscmp(argv[1], L"sendBreak") == 0) + { + *command = TaskBreak; + return TRUE; + } if (wcscmp(argv[1], L"processList") == 0) { *command = TaskProcessList; @@ -89,6 +101,14 @@ static BOOL ParseCommandLine(__in int argc, return FALSE; } +BOOL NoopConsoleCtrlEventHandler( DWORD fdwCtrlType ) +{ + if ( fdwCtrlType == CTRL_BREAK_EVENT || fdwCtrlType == CTRL_C_EVENT) { + return TRUE; + } + return FALSE; +} + //---------------------------------------------------------------------------- // Function: createTask // @@ -149,6 +169,8 @@ DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) si.cb = sizeof(si); ZeroMemory( &pi, sizeof(pi) ); + SetConsoleCtrlHandler(NoopConsoleCtrlEventHandler, TRUE); + if (CreateProcess(NULL, cmdLine, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi) == 0) { err = GetLastError(); @@ -279,6 +301,56 @@ DWORD killTask(PCWSTR jobObjName) return ERROR_SUCCESS; } +DWORD sendConsoleCtrlEvent(PCWSTR jobObjName, DWORD dwCtrlEvent) +{ + HANDLE jobObject = OpenJobObject(JOB_OBJECT_QUERY, FALSE, jobObjName); + JOBOBJECT_BASIC_PROCESS_ID_LIST* jobInfo; + DWORD i; + DWORD pid; + + if(jobObject == NULL) + { + DWORD err = GetLastError(); + if(err == ERROR_FILE_NOT_FOUND) + { + // job object does not exist. assume its not alive + return ERROR_SUCCESS; + } + return err; + } + + jobInfo = (JOBOBJECT_BASIC_PROCESS_ID_LIST*) LocalAlloc(LPTR, sizeof (JOBOBJECT_BASIC_PROCESS_ID_LIST) + sizeof(ULONG)*100); + + if(QueryInformationJobObject(jobObject, JobObjectBasicProcessIdList, jobInfo, sizeof (JOBOBJECT_BASIC_PROCESS_ID_LIST) + sizeof(ULONG)*100, NULL) == 0) + { + return GetLastError(); + } + + if (jobInfo->NumberOfProcessIdsInList >0) + { + pid = (DWORD) jobInfo->ProcessIdList[0]; + FreeConsole(); + AttachConsole(pid); + SetConsoleCtrlHandler(NoopConsoleCtrlEventHandler, TRUE); + GenerateConsoleCtrlEvent(dwCtrlEvent, 0); + } + + LocalFree(jobInfo); + + CloseHandle(jobObject); + return ERROR_SUCCESS; +} + +DWORD sendBreakToTask(PCWSTR jobObjName) +{ + return sendConsoleCtrlEvent(jobObjName, CTRL_BREAK_EVENT); +} + +DWORD sendCtrlCToTask(PCWSTR jobObjName) +{ + return sendConsoleCtrlEvent(jobObjName, CTRL_C_EVENT); +} + //---------------------------------------------------------------------------- // Function: printTaskProcessList // @@ -362,7 +434,7 @@ DWORD printTaskProcessList(const WCHAR* jobObjName) // Function: Task // // Description: -// Manages a task via a jobobject (create/isAlive/kill). Outputs the +// Manages a task via a jobobject (create/isAlive/kill/sendCtrlC/sendBreak). Outputs the // appropriate information to stdout on success, or stderr on failure. // // Returns: @@ -425,6 +497,26 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) ReportErrorCode(L"killTask", dwErrorCode); goto TaskExit; } + } else if (command == TaskCtrlC) + { + // Check if task jobobject + // + dwErrorCode = sendCtrlCToTask(argv[2]); + if (dwErrorCode != ERROR_SUCCESS) + { + ReportErrorCode(L"sendCtrlCToTask", dwErrorCode); + goto TaskExit; + } + } else if (command == TaskBreak) + { + // Check if task jobobject + // + dwErrorCode = sendBreakToTask(argv[2]); + if (dwErrorCode != ERROR_SUCCESS) + { + ReportErrorCode(L"sendBreakToTask", dwErrorCode); + goto TaskExit; + } } else if (command == TaskProcessList) { // Check if task jobobject @@ -455,6 +547,8 @@ void TaskUsage() Usage: task create [TASKNAME] [COMMAND_LINE] |\n\ task isAlive [TASKNAME] |\n\ task kill [TASKNAME]\n\ + task sendCtrlC [TASKNAME]\n\ + task sendBreak [TASKNAME]\n\ task processList [TASKNAME]\n\ Creates a new task jobobject with taskname\n\ Checks if task jobobject is alive\n\ diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index dd1060c..1e55d97 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -71,6 +73,7 @@ import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; @@ -438,5 +441,11 @@ public StopContainersResponse stopContainers(StopContainersRequest request) "Dummy function cause")); throw new IOException(e); } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + return null; + } } } \ No newline at end of file diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 6f21c87..2cdd7a9 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; @@ -443,6 +445,11 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws IOException { return null; } + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + return null; + } } @SuppressWarnings("serial") diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 7aa43df..821f53c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -22,8 +22,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -170,4 +173,10 @@ StopContainersResponse stopContainers(StopContainersRequest request) GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException, IOException; + + @Public + @Unstable + SignalContainersResponse signalContainers(SignalContainersRequest request) + throws YarnException, IOException; + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersRequest.java new file mode 100644 index 0000000..8ba5e45 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersRequest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by the ApplicationMaster to the + * NodeManager to send a signal to requested containers. + *

+ */ +@Public +@Stable +public abstract class SignalContainersRequest { + + @Public + @Stable + public static SignalContainersRequest newInstance( + List containerIds, + int signal) { + SignalContainersRequest request = + Records.newRecord(SignalContainersRequest.class); + request.setContainerIds(containerIds); + request.setSignal(signal); + return request; + } + + /** + * Get signal to send to containers. + */ + @Public + @Stable + public abstract int getSignal(); + + /** + * Set signal to send to containers. + */ + @Public + @Stable + public abstract void setSignal(int signal); + + /** + * Get the list of ContainerIds of containers to + * send the signal to. + * + * @return the list of ContainerIds of containers to + * send the signal to. + */ + @Public + @Stable + public abstract List getContainerIds(); + + /** + * Set a list of ContainerIds of containers to + * send the signal to. + * + * @param containerIds + * a list of ContainerIds of containers to + * send the signal to. + */ + @Public + @Stable + public abstract void setContainerIds(List containerIds); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersResponse.java new file mode 100644 index 0000000..979f1b2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersResponse.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The response sent by the NodeManager to the + * ApplicationMaster when asked to send a signal + * to requested containers. + *

+ */ +@Public +@Stable +public abstract class SignalContainersResponse { + + @Private + @Unstable + public static SignalContainersResponse newInstance( + Map failedRequests) { + SignalContainersResponse response = + Records.newRecord(SignalContainersResponse.class); + response.setFailedRequests(failedRequests); + return response; + } + + /** + * Get the containerId-to-exception map in which the exception indicates error + * from per container for failed requests + */ + @Public + @Stable + public abstract Map getFailedRequests(); + + /** + * Set the containerId-to-exception map in which the exception indicates error + * from per container for failed requests + */ + @Private + @Unstable + public abstract void setFailedRequests( + Map failedContainers); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 7b1647b..3bdb27a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -34,4 +34,5 @@ service ContainerManagementProtocolService { rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); + rpc signalContainers(SignalContainersRequestProto) returns (SignalContainersResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 391019a..ea2b010 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -219,3 +219,12 @@ message GetContainerStatusesResponseProto { repeated ContainerStatusProto status = 1; repeated ContainerExceptionMapProto failed_requests = 2; } + +message SignalContainersRequestProto { + repeated ContainerIdProto container_id = 1; + required int32 signal = 2; +} + +message SignalContainersResponseProto { + repeated ContainerExceptionMapProto failed_requests = 1; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 15397e3..579d7a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -32,12 +32,16 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; @@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -128,4 +133,18 @@ public GetContainerStatusesResponse getContainerStatuses( return null; } } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + SignalContainersRequestProto requestProto = + ((SignalContainersRequestPBImpl) request).getProto(); + try { + return new SignalContainersResponsePBImpl(proxy.signalContainers( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 2d33e69..b193808 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -24,10 +24,13 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; @@ -35,6 +38,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses( throw new ServiceException(e); } } + + @Override + public SignalContainersResponseProto signalContainers( + RpcController controller, SignalContainersRequestProto proto) + throws ServiceException { + SignalContainersRequestPBImpl request = new SignalContainersRequestPBImpl(proto); + try { + SignalContainersResponse response = real.signalContainers(request); + return ((SignalContainersResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersRequestPBImpl.java new file mode 100644 index 0000000..fb06b17 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersRequestPBImpl.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class SignalContainersRequestPBImpl extends + SignalContainersRequest { + SignalContainersRequestProto proto = SignalContainersRequestProto + .getDefaultInstance(); + SignalContainersRequestProto.Builder builder = null; + boolean viaProto = false; + + private List containerIds = null; + private Integer signal = null; + + public SignalContainersRequestPBImpl() { + builder = SignalContainersRequestProto.newBuilder(); + } + + public SignalContainersRequestPBImpl( + SignalContainersRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainersRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.containerIds != null) { + addLocalContainerIdsToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainersRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void addLocalContainerIdsToProto() { + maybeInitBuilder(); + builder.clearContainerId(); + if (this.containerIds == null) + return; + List protoList = new ArrayList(); + for (ContainerId id : containerIds) { + protoList.add(convertToProtoFormat(id)); + } + builder.addAllContainerId(protoList); + } + + private void initLocalContainerIds() { + if (this.containerIds != null) { + return; + } + SignalContainersRequestProtoOrBuilder p = viaProto ? proto : builder; + List containerIds = p.getContainerIdList(); + this.containerIds = new ArrayList(); + for (ContainerIdProto id : containerIds) { + this.containerIds.add(convertFromProtoFormat(id)); + } + } + + @Override + public List getContainerIds() { + initLocalContainerIds(); + return this.containerIds; + } + + @Override + public void setContainerIds(List containerIds) { + maybeInitBuilder(); + if (containerIds == null) + builder.clearContainerId(); + this.containerIds = containerIds; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + @Override + public int getSignal() { + SignalContainersRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.signal != null) { + return this.signal; + } + this.signal = p.getSignal(); + return this.signal; + } + + @Override + public void setSignal(int signal) { + maybeInitBuilder(); + this.signal = signal; + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersResponsePBImpl.java new file mode 100644 index 0000000..40d88d3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersResponsePBImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerExceptionMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class SignalContainersResponsePBImpl extends + SignalContainersResponse { + SignalContainersResponseProto proto = SignalContainersResponseProto + .getDefaultInstance(); + SignalContainersResponseProto.Builder builder = null; + boolean viaProto = false; + + private Map failedRequests = null; + + public SignalContainersResponsePBImpl() { + builder = SignalContainersResponseProto.newBuilder(); + } + + public SignalContainersResponsePBImpl( + SignalContainersResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainersResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.failedRequests != null) { + addFailedRequestsToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainersResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void addFailedRequestsToProto() { + maybeInitBuilder(); + builder.clearFailedRequests(); + if (this.failedRequests == null) + return; + List protoList = + new ArrayList(); + for (Map.Entry entry : this.failedRequests + .entrySet()) { + protoList.add(ContainerExceptionMapProto.newBuilder() + .setContainerId(convertToProtoFormat(entry.getKey())) + .setException(convertToProtoFormat(entry.getValue())).build()); + } + builder.addAllFailedRequests(protoList); + } + + private void initFailedRequests() { + if (this.failedRequests != null) { + return; + } + SignalContainersResponseProtoOrBuilder p = viaProto ? proto : builder; + List protoList = p.getFailedRequestsList(); + this.failedRequests = new HashMap(); + for (ContainerExceptionMapProto ce : protoList) { + this.failedRequests.put(convertFromProtoFormat(ce.getContainerId()), + convertFromProtoFormat(ce.getException())); + } + } + + @Override + public Map getFailedRequests() { + initFailedRequests(); + return this.failedRequests; + } + + @Override + public void setFailedRequests( + Map failedRequests) { + maybeInitBuilder(); + if (failedRequests == null) + builder.clearFailedRequests(); + this.failedRequests = failedRequests; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + private SerializedExceptionPBImpl convertFromProtoFormat( + SerializedExceptionProto p) { + return new SerializedExceptionPBImpl(p); + } + + private SerializedExceptionProto convertToProtoFormat(SerializedException t) { + return ((SerializedExceptionPBImpl) t).getProto(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 8fe5c3c..0b9a15d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -165,5 +167,11 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesResponse.newInstance(list, null); return null; } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + return null; + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 76384d3..4123f6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -218,6 +220,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request) new Exception(EXCEPTION_CAUSE)); throw new YarnException(e); } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index ee72fbc..c8eeb8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -164,6 +164,14 @@ public int getValue() { public String toString() { return str; } + public static Signal valueOf(int signal_num) + throws IllegalArgumentException { + for (Signal s : Signal.values()) { + if (s.value == signal_num) + return s; + } + throw new IllegalArgumentException(); + } } protected void logOutput(String output) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3091c4a..ddffcfb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -87,6 +89,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; +import org.apache.hadoop.yarn.server.nodemanager.NodeManagerEventType; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; @@ -104,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; @@ -891,4 +896,50 @@ public Context getContext() { public Map getAuxServiceMetaData() { return this.auxiliaryServices.getMetaData(); } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + Map failedRequests = + new HashMap(); + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi); + for (ContainerId id : request.getContainerIds()) { + try { + signalContainerInternal(request.getSignal(), id, identifier); + } catch (YarnException e) { + failedRequests.put(id, SerializedException.newInstance(e)); + } + } + return SignalContainersResponse.newInstance(failedRequests); + } + + private void signalContainerInternal(int signal_num, ContainerId containerID, + NMTokenIdentifier nmTokenIdentifier) throws YarnException { + String containerIDStr = containerID.toString(); + Container container = this.context.getContainers().get(containerID); + + Signal signal; + try { + signal = Signal.valueOf(signal_num); + } catch (IllegalArgumentException e) { + throw new YarnException("Non supported signal " + signal_num); + } + + LOG.info("Sending signal " + signal + " to container " + containerIDStr); + authorizeGetAndStopContainerRequest(containerID, container, false, + nmTokenIdentifier); + + if (container == null) { + if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " was recently stopped on node manager."); + } else { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } + } + this.containersLauncher.handle( + new SignalContainersLauncherEvent(container, signal)); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index edc3146..646fd82 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -403,6 +403,66 @@ public void cleanupContainer() throws IOException { } /** + * Send a signal to the container. + * + * Acceptable signals: {@link Signal#QUIT} + * + * @throws IOException + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void signalContainer(Signal signal) throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = ConverterUtils.toString(containerId); + String user = container.getUser(); + + if (signal != Signal.QUIT) { + LOG.debug("Not sending non supported signal " + signal + + " as user " + user + + " for container " + containerIdStr); + return; + } + + LOG.info("Sending signal " + signal + " to container " + containerIdStr); + + boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + if (!alreadyLaunched) { + LOG.info("Container " + containerIdStr + " not launched." + + " Not sending the signal"); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting pid for container " + containerIdStr + + " to send signal to from pid file " + + (pidFilePath != null ? pidFilePath.toString() : "null")); + } + + try { + + // get process id from pid file if available + // else if shell is still active, get it from the shell + String processId = null; + if (pidFilePath != null) { + processId = getContainerPid(pidFilePath); + } + + if (processId != null) { + LOG.debug("Sending signal " + signal + " to pid " + processId + + " as user " + user + + " for container " + containerIdStr); + exec.signalContainer(user, processId, signal); + } + } catch (Exception e) { + String message = + "Exception when sending signal to container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.warn(message); + dispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(containerId, message)); + } + } + + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. * @param pidFilePath File from which to read the process id diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index ce865e3..9c53366 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -141,6 +141,21 @@ public void handle(ContainersLauncherEvent event) { + ". Ignoring."); } break; + case SIGNAL_CONTAINER: + SignalContainersLauncherEvent signalEvent = + (SignalContainersLauncherEvent) event; + launcher = running.get(containerId); + if (launcher == null) { + // Container not launched. So nothing needs to be done. + return; + } + try { + launcher.signalContainer(signalEvent.getSignal()); + } catch (IOException e) { + LOG.warn("Got exception while cleaning container " + containerId + + ". Ignoring."); + } + break; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 6793bf7..92e1fcc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -21,4 +21,5 @@ public enum ContainersLauncherEventType { LAUNCH_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. + SIGNAL_CONTAINER, } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java new file mode 100644 index 0000000..6848533 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +public class SignalContainersLauncherEvent + extends ContainersLauncherEvent { + + private final Signal signal; + + public SignalContainersLauncherEvent(Container container, Signal signal) { + super(container, ContainersLauncherEventType.SIGNAL_CONTAINER); + this.signal = signal; + } + + public Signal getSignal() { + return signal; + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index a47e7f7..c269531 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -97,6 +97,18 @@ public void testContainerManagerInitialization() throws IOException { } @Override + public void testSignalContainer() throws IOException, InterruptedException, + YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testSignalContainer"); + super.testSignalContainer(); + } + + @Override public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException { // Don't run the test if the binary is not available. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 4f23427..275bc1d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -262,21 +262,26 @@ public static void waitForContainerState(ContainerManagementProtocol containerMa GetContainerStatusesRequest request = GetContainerStatusesRequest.newInstance(list); ContainerStatus containerStatus = - containerManager.getContainerStatuses(request).getContainerStatuses() - .get(0); + getContainerStatus(containerManager, request); int timeoutSecs = 0; while (!containerStatus.getState().equals(finalState) && timeoutSecs++ < timeOutMax) { Thread.sleep(1000); LOG.info("Waiting for container to get into state " + finalState + ". Current state is " + containerStatus.getState()); - containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0); + containerStatus = getContainerStatus(containerManager, request); } LOG.info("Container state is " + containerStatus.getState()); Assert.assertEquals("ContainerState is not correct (timedout)", finalState, containerStatus.getState()); } + public static ContainerStatus getContainerStatus( + ContainerManagementProtocol containerManager, + GetContainerStatusesRequest request) throws YarnException, IOException { + return containerManager.getContainerStatuses(request).getContainerStatuses().get(0); + } + static void waitForApplicationState(ContainerManagerImpl containerManager, ApplicationId appID, ApplicationState finalState) throws InterruptedException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/SleepTool.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/SleepTool.java new file mode 100644 index 0000000..ce3506a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/SleepTool.java @@ -0,0 +1,32 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +/** + * A public class with the main method with just sleeps for 100 seconds. + * + * Used as a yarn container stub in {@link TestContainerManager#testSignalContainer()} + */ +public class SleepTool { + + public static void main(String[] args) throws Exception { + Thread.sleep(100*1000); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index f62cd50..e52d62b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -27,9 +27,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import junit.framework.Assert; @@ -42,6 +44,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -69,6 +72,7 @@ import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; @@ -83,6 +87,9 @@ import org.junit.Before; import org.junit.Test; +import com.google.common.base.Charsets; +import com.google.common.io.Files; + public class TestContainerManager extends BaseContainerManagerTest { public TestContainerManager() throws UnsupportedFileSystemException { @@ -249,92 +256,34 @@ public void testContainerSetup() throws Exception { // Now verify the contents of the file BufferedReader reader = new BufferedReader(new FileReader(targetFile)); - Assert.assertEquals("Hello World!", reader.readLine()); - Assert.assertEquals(null, reader.readLine()); + try { + Assert.assertEquals("Hello World!", reader.readLine()); + Assert.assertEquals(null, reader.readLine()); + } finally { + reader.close(); + } } @Test public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException { - containerManager.start(); - - File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); - PrintWriter fileWriter = new PrintWriter(scriptFile); - File processStartFile = - new File(tmpDir, "start_file.txt").getAbsoluteFile(); - - // ////// Construct the Container-id + // Construct the Container-id ContainerId cId = createContainerId(0); - - if (Shell.WINDOWS) { - fileWriter.println("@echo Hello World!> " + processStartFile); - fileWriter.println("@echo " + cId + ">> " + processStartFile); - fileWriter.println("@ping -n 100 127.0.0.1 >nul"); - } else { - fileWriter.write("\numask 0"); // So that start file is readable by the test - fileWriter.write("\necho Hello World! > " + processStartFile); - fileWriter.write("\necho $$ >> " + processStartFile); - fileWriter.write("\nexec sleep 100"); - } - fileWriter.close(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - URL resource_alpha = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = - recordFactory.newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, rsrc_alpha); - containerLaunchContext.setLocalResources(localResources); - List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - containerLaunchContext.setCommands(commands); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - createContainerToken(cId, - DUMMY_RM_IDENTIFIER, context.getNodeId(), user, - context.getContainerTokenSecretManager())); - List list = new ArrayList(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - containerManager.startContainers(allRequests); - - int timeoutSecs = 0; - while (!processStartFile.exists() && timeoutSecs++ < 20) { - Thread.sleep(1000); - LOG.info("Waiting for process start-file to be created"); - } - Assert.assertTrue("ProcessStartFile doesn't exist!", - processStartFile.exists()); - // Now verify the contents of the file - BufferedReader reader = - new BufferedReader(new FileReader(processStartFile)); - Assert.assertEquals("Hello World!", reader.readLine()); - // Get the pid of the process - String pid = reader.readLine().trim(); - // No more lines - Assert.assertEquals(null, reader.readLine()); - - // Now test the stop functionality. - - // Assert that the process is alive - Assert.assertTrue("Process is not alive!", - DefaultContainerExecutor.containerIsAlive(pid)); - // Once more - Assert.assertTrue("Process is not alive!", - DefaultContainerExecutor.containerIsAlive(pid)); + ShellScriptResourcesProvider resourcesProvider = new ShellScriptResourcesProvider(cId) { + + @Override + public void appendCommands(PrintWriter scriptWriter) { + if (Shell.WINDOWS) { + scriptWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + scriptWriter.write("\nexec sleep 100"); + } + } + + }; + startContainer(cId, resourcesProvider); + String pid = resourcesProvider.waitForStartFile(); List containerIds = new ArrayList(); containerIds.add(cId); @@ -356,67 +305,128 @@ public void testContainerLaunchAndStop() throws IOException, Assert.assertFalse("Process is still alive!", DefaultContainerExecutor.containerIsAlive(pid)); } - - private void testContainerLaunchAndExit(int exitCode) throws IOException, + + @Test + public void testSignalContainer() throws IOException, InterruptedException, YarnException { + // Construct the Container-id + ContainerId cId = createContainerId(0); - File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); - PrintWriter fileWriter = new PrintWriter(scriptFile); - File processStartFile = - new File(tmpDir, "start_file.txt").getAbsoluteFile(); - - // ////// Construct the Container-id - ContainerId cId = createContainerId(0); - - if (Shell.WINDOWS) { - fileWriter.println("@echo Hello World!> " + processStartFile); - fileWriter.println("@echo " + cId + ">> " + processStartFile); - if (exitCode != 0) { - fileWriter.println("@exit " + exitCode); - } - } else { - fileWriter.write("\numask 0"); // So that start file is readable by the test - fileWriter.write("\necho Hello World! > " + processStartFile); - fileWriter.write("\necho $$ >> " + processStartFile); - // Have script throw an exit code at the end - if (exitCode != 0) { - fileWriter.write("\nexit "+exitCode); - } - } - - fileWriter.close(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - URL resource_alpha = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource rsrc_alpha = - recordFactory.newRecordInstance(LocalResource.class); - rsrc_alpha.setResource(resource_alpha); - rsrc_alpha.setSize(-1); - rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); - rsrc_alpha.setType(LocalResourceType.FILE); - rsrc_alpha.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, rsrc_alpha); - containerLaunchContext.setLocalResources(localResources); - List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); - containerLaunchContext.setCommands(commands); + final File logFile = new File(tmpDir, "testSignalContainer.out") + .getAbsoluteFile(); + + ShellScriptResourcesProvider resourcesProvider = + new ShellScriptResourcesProvider(cId) { + + @Override + void appendCommands(PrintWriter scriptWriter) { + File testClassesDir = new File("target", "test-classes") + .getAbsoluteFile(); + String cmd = "java -classpath " + testClassesDir + " " + + SleepTool.class.getCanonicalName() + + " >" + logFile + " 2>&1"; + if (Shell.WINDOWS) { + scriptWriter.println(cmd); + } else { + scriptWriter.write("\nexec " + cmd); + } + } + + }; + startContainer(cId, resourcesProvider); + resourcesProvider.waitForStartFile(); + + List containerIds = new ArrayList(); + containerIds.add(cId); + SignalContainersRequest signalRequest = + SignalContainersRequest.newInstance(containerIds, Signal.QUIT.getValue()); + containerManager.signalContainers(signalRequest); + + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + + Thread.sleep(100); + ContainerStatus containerStatus = + BaseContainerManagerTest.getContainerStatus(containerManager, gcsRequest); + Assert.assertEquals("Container state changed", ContainerState.RUNNING, + containerStatus.getState()); + + String containerLog = Files.toString(logFile, Charsets.US_ASCII); + System.out.println(containerLog); + Assert.assertTrue( + Pattern.compile("Full thread dump").matcher(containerLog).find()); + + StopContainersRequest stopRequest = + StopContainersRequest.newInstance(containerIds); + containerManager.stopContainers(stopRequest); + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + } + + private void startContainer(ContainerId cId, LocalResourcesProvider resourcesProvider) + throws IOException, YarnException, InterruptedException { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + Map localResources = + new HashMap(); + + for (Map.Entry e : resourcesProvider.getLocalResources().entrySet()) { + String destinationFile = e.getKey(); + File localResource = e.getValue(); + + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(localResource.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(localResource.lastModified()); + localResources.put(destinationFile, rsrc_alpha); + } + containerLaunchContext.setLocalResources(localResources); + + List commands = Arrays.asList( + Shell.getRunScriptCommand(resourcesProvider.getScriptFile())); + containerLaunchContext.setCommands(commands); StartContainerRequest scRequest = - StartContainerRequest.newInstance( - containerLaunchContext, - createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), - user, context.getContainerTokenSecretManager())); + StartContainerRequest.newInstance(containerLaunchContext, + createContainerToken(cId, + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, + context.getContainerTokenSecretManager())); List list = new ArrayList(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); containerManager.startContainers(allRequests); + } + + private void testContainerLaunchAndExit(final int exitCode) throws IOException, + InterruptedException, YarnException { + // Construct the Container-id + ContainerId cId = createContainerId(0); + + LocalResourcesProvider resourcesProvider = + new ShellScriptResourcesProvider(cId) { + + @Override + void appendCommands(PrintWriter scriptWriter) { + if (exitCode != 0) { + if (Shell.WINDOWS) { + scriptWriter.println("@exit " + exitCode); + } else { + scriptWriter.write("\nexit "+exitCode); + } + } + } + }; + startContainer(cId, resourcesProvider); BaseContainerManagerTest.waitForContainerState(containerManager, cId, ContainerState.COMPLETE); @@ -807,4 +817,80 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, containerTokenIdentifier); return containerToken; } + + private static interface LocalResourcesProvider { + Map getLocalResources() throws IOException; + File getScriptFile() throws IOException; + } + + private static abstract class ShellScriptResourcesProvider + implements LocalResourcesProvider { + + final File scriptFile; + final File processStartFile; + + public ShellScriptResourcesProvider(ContainerId cId) throws IOException { + this.scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter scriptWriter = new PrintWriter(scriptFile); + this.processStartFile = + new File(tmpDir, "start_file.txt").getAbsoluteFile(); + + if (Shell.WINDOWS) { + scriptWriter.println("@echo Hello World!> " + processStartFile); + scriptWriter.println("@echo " + cId + ">> " + processStartFile); + } else { + scriptWriter.write("\numask 0"); // So that start file is readable by the test + scriptWriter.write("\necho Hello World! > " + processStartFile); + scriptWriter.write("\necho $$ >> " + processStartFile); + } + appendCommands(scriptWriter); + scriptWriter.close(); + } + + abstract void appendCommands(PrintWriter scriptWriter); + + @Override + public Map getLocalResources() throws IOException { + return Collections.singletonMap("dest_file", scriptFile); + } + + @Override + public File getScriptFile() throws IOException { + return scriptFile; + } + + public String waitForStartFile() throws IOException, InterruptedException { + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + + // Now verify the contents of the file + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + try { + Assert.assertEquals("Hello World!", reader.readLine()); + // Get the pid of the process + String pid = reader.readLine().trim(); + // No more lines + Assert.assertEquals(null, reader.readLine()); + + // Assert that the process is alive + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + // Once more + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + + return pid; + } finally { + reader.close(); + } + } + + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index c9e57a6..1476437 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -45,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -308,4 +312,11 @@ synchronized public GetContainerStatusesResponse getContainerStatuses( nodeStatus.setNodeHealthStatus(nodeHealthStatus); return nodeStatus; } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + return SignalContainersResponse.newInstance( + Collections.emptyMap()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index a9f1c1a..40ef848 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; @@ -129,6 +131,12 @@ public Credentials getContainerCredentials() throws IOException { credentials.readTokenStorageStream(buf); return credentials; } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + return SignalContainersResponse.newInstance(null); + } } public static class MockRMWithAMS extends MockRMWithCustomAMLauncher { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..6c77e74 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -118,6 +120,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { return null; } + + @Override + public SignalContainersResponse signalContainers( + SignalContainersRequest request) throws YarnException, IOException { + return null; + } } @Test