diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index 8013f22..6fd0b8f 100644
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -177,8 +177,13 @@ static private OSType getOSType() {
/** Return a command to send a signal to a given pid */
public static String[] getSignalKillCommand(int code, String pid) {
- return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } :
- new String[] { "kill", "-" + code, isSetsidAvailable ? "-" + pid : pid };
+ if (Shell.WINDOWS) {
+ String cmd = code == 3 ? "sendBreak" : "kill";
+ return new String[] { Shell.WINUTILS, "task", cmd , pid };
+ } else {
+ return new String[]
+ { "kill", "-" + code, isSetsidAvailable ? "-" + pid : pid };
+ }
}
/** Return a regular expression string that match environment variables */
diff --git hadoop-common-project/hadoop-common/src/main/winutils/task.c hadoop-common-project/hadoop-common/src/main/winutils/task.c
index 19bda96..a890776 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/task.c
+++ hadoop-common-project/hadoop-common/src/main/winutils/task.c
@@ -36,6 +36,7 @@ typedef enum TaskCommandOptionType
TaskCreate,
TaskIsAlive,
TaskKill,
+ TaskBreak,
TaskProcessList
} TaskCommandOption;
@@ -71,6 +72,11 @@ static BOOL ParseCommandLine(__in int argc,
*command = TaskKill;
return TRUE;
}
+ if (wcscmp(argv[1], L"sendBreak") == 0)
+ {
+ *command = TaskBreak;
+ return TRUE;
+ }
if (wcscmp(argv[1], L"processList") == 0)
{
*command = TaskProcessList;
@@ -89,6 +95,14 @@ static BOOL ParseCommandLine(__in int argc,
return FALSE;
}
+BOOL NoopCtrlHandler( DWORD fdwCtrlType )
+{
+ if ( fdwCtrlType == CTRL_BREAK_EVENT ) {
+ return TRUE;
+ }
+ return FALSE;
+}
+
//----------------------------------------------------------------------------
// Function: createTask
//
@@ -149,6 +163,8 @@ DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine)
si.cb = sizeof(si);
ZeroMemory( &pi, sizeof(pi) );
+ SetConsoleCtrlHandler(NoopCtrlHandler, TRUE);
+
if (CreateProcess(NULL, cmdLine, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi) == 0)
{
err = GetLastError();
@@ -279,6 +295,46 @@ DWORD killTask(PCWSTR jobObjName)
return ERROR_SUCCESS;
}
+DWORD sendBreakToTask(PCWSTR jobObjName)
+{
+ HANDLE jobObject = OpenJobObject(JOB_OBJECT_QUERY, FALSE, jobObjName);
+ JOBOBJECT_BASIC_PROCESS_ID_LIST* jobInfo;
+ DWORD i;
+ DWORD pid;
+
+ if(jobObject == NULL)
+ {
+ DWORD err = GetLastError();
+ if(err == ERROR_FILE_NOT_FOUND)
+ {
+ // job object does not exist. assume its not alive
+ return ERROR_SUCCESS;
+ }
+ return err;
+ }
+
+ jobInfo = (JOBOBJECT_BASIC_PROCESS_ID_LIST*) LocalAlloc(LPTR, sizeof (JOBOBJECT_BASIC_PROCESS_ID_LIST) + sizeof(ULONG)*100);
+
+ if(QueryInformationJobObject(jobObject, JobObjectBasicProcessIdList, jobInfo, sizeof (JOBOBJECT_BASIC_PROCESS_ID_LIST) + sizeof(ULONG)*100, NULL) == 0)
+ {
+ return GetLastError();
+ }
+
+ for (i = 0; i < jobInfo->NumberOfProcessIdsInList; i++)
+ {
+ pid = (DWORD) jobInfo->ProcessIdList[i];
+ FreeConsole();
+ AttachConsole(pid);
+ SetConsoleCtrlHandler(NoopCtrlHandler, TRUE);
+ GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid);
+ }
+
+ LocalFree(jobInfo);
+
+ CloseHandle(jobObject);
+ return ERROR_SUCCESS;
+}
+
//----------------------------------------------------------------------------
// Function: printTaskProcessList
//
@@ -362,7 +418,7 @@ DWORD printTaskProcessList(const WCHAR* jobObjName)
// Function: Task
//
// Description:
-// Manages a task via a jobobject (create/isAlive/kill). Outputs the
+// Manages a task via a jobobject (create/isAlive/kill/sendBreak). Outputs the
// appropriate information to stdout on success, or stderr on failure.
//
// Returns:
@@ -425,6 +481,16 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
ReportErrorCode(L"killTask", dwErrorCode);
goto TaskExit;
}
+ } else if (command == TaskBreak)
+ {
+ // Check if task jobobject
+ //
+ dwErrorCode = sendBreakToTask(argv[2]);
+ if (dwErrorCode != ERROR_SUCCESS)
+ {
+ ReportErrorCode(L"sendBreakToTask", dwErrorCode);
+ goto TaskExit;
+ }
} else if (command == TaskProcessList)
{
// Check if task jobobject
@@ -455,6 +521,7 @@ void TaskUsage()
Usage: task create [TASKNAME] [COMMAND_LINE] |\n\
task isAlive [TASKNAME] |\n\
task kill [TASKNAME]\n\
+ task sendBreak [TASKNAME]\n\
task processList [TASKNAME]\n\
Creates a new task jobobject with taskname\n\
Checks if task jobobject is alive\n\
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
index dd1060c..1e55d97 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
@@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -71,6 +73,7 @@
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
@@ -438,5 +441,11 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
"Dummy function cause"));
throw new IOException(e);
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
}
\ No newline at end of file
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
index 6f21c87..2cdd7a9 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
@@ -47,6 +47,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@@ -443,6 +445,11 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws IOException {
return null;
}
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
@SuppressWarnings("serial")
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
index 7aa43df..821f53c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java
@@ -22,8 +22,11 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -170,4 +173,10 @@ StopContainersResponse stopContainers(StopContainersRequest request)
GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws YarnException,
IOException;
+
+ @Public
+ @Unstable
+ SignalContainersResponse signalContainers(SignalContainersRequest request)
+ throws YarnException, IOException;
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersRequest.java
new file mode 100644
index 0000000..8ba5e45
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersRequest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ * The request sent by the ApplicationMaster to the
+ * NodeManager to send a signal to requested containers.
+ *
+ */
+@Public
+@Stable
+public abstract class SignalContainersRequest {
+
+ @Public
+ @Stable
+ public static SignalContainersRequest newInstance(
+ List containerIds,
+ int signal) {
+ SignalContainersRequest request =
+ Records.newRecord(SignalContainersRequest.class);
+ request.setContainerIds(containerIds);
+ request.setSignal(signal);
+ return request;
+ }
+
+ /**
+ * Get signal to send to containers.
+ */
+ @Public
+ @Stable
+ public abstract int getSignal();
+
+ /**
+ * Set signal to send to containers.
+ */
+ @Public
+ @Stable
+ public abstract void setSignal(int signal);
+
+ /**
+ * Get the list of ContainerIds of containers to
+ * send the signal to.
+ *
+ * @return the list of ContainerIds of containers to
+ * send the signal to.
+ */
+ @Public
+ @Stable
+ public abstract List getContainerIds();
+
+ /**
+ * Set a list of ContainerIds of containers to
+ * send the signal to.
+ *
+ * @param containerIds
+ * a list of ContainerIds of containers to
+ * send the signal to.
+ */
+ @Public
+ @Stable
+ public abstract void setContainerIds(List containerIds);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersResponse.java
new file mode 100644
index 0000000..979f1b2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainersResponse.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ * The response sent by the NodeManager to the
+ * ApplicationMaster when asked to send a signal
+ * to requested containers.
+ *
+ */
+@Public
+@Stable
+public abstract class SignalContainersResponse {
+
+ @Private
+ @Unstable
+ public static SignalContainersResponse newInstance(
+ Map failedRequests) {
+ SignalContainersResponse response =
+ Records.newRecord(SignalContainersResponse.class);
+ response.setFailedRequests(failedRequests);
+ return response;
+ }
+
+ /**
+ * Get the containerId-to-exception map in which the exception indicates error
+ * from per container for failed requests
+ */
+ @Public
+ @Stable
+ public abstract Map getFailedRequests();
+
+ /**
+ * Set the containerId-to-exception map in which the exception indicates error
+ * from per container for failed requests
+ */
+ @Private
+ @Unstable
+ public abstract void setFailedRequests(
+ Map failedContainers);
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
index 7b1647b..3bdb27a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto
@@ -34,4 +34,5 @@ service ContainerManagementProtocolService {
rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto);
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
+ rpc signalContainers(SignalContainersRequestProto) returns (SignalContainersResponseProto);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 391019a..ea2b010 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -219,3 +219,12 @@ message GetContainerStatusesResponseProto {
repeated ContainerStatusProto status = 1;
repeated ContainerExceptionMapProto failed_requests = 2;
}
+
+message SignalContainersRequestProto {
+ repeated ContainerIdProto container_id = 1;
+ required int32 signal = 2;
+}
+
+message SignalContainersResponseProto {
+ repeated ContainerExceptionMapProto failed_requests = 1;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
index 15397e3..579d7a9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java
@@ -32,12 +32,16 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
@@ -46,6 +50,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
@@ -128,4 +133,18 @@ public GetContainerStatusesResponse getContainerStatuses(
return null;
}
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ SignalContainersRequestProto requestProto =
+ ((SignalContainersRequestPBImpl) request).getProto();
+ try {
+ return new SignalContainersResponsePBImpl(proxy.signalContainers(
+ null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
index 2d33e69..b193808 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java
@@ -24,10 +24,13 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
@@ -35,6 +38,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
@@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses(
throw new ServiceException(e);
}
}
+
+ @Override
+ public SignalContainersResponseProto signalContainers(
+ RpcController controller, SignalContainersRequestProto proto)
+ throws ServiceException {
+ SignalContainersRequestPBImpl request = new SignalContainersRequestPBImpl(proto);
+ try {
+ SignalContainersResponse response = real.signalContainers(request);
+ return ((SignalContainersResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersRequestPBImpl.java
new file mode 100644
index 0000000..fb06b17
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersRequestPBImpl.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersRequestProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class SignalContainersRequestPBImpl extends
+ SignalContainersRequest {
+ SignalContainersRequestProto proto = SignalContainersRequestProto
+ .getDefaultInstance();
+ SignalContainersRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List containerIds = null;
+ private Integer signal = null;
+
+ public SignalContainersRequestPBImpl() {
+ builder = SignalContainersRequestProto.newBuilder();
+ }
+
+ public SignalContainersRequestPBImpl(
+ SignalContainersRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainersRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containerIds != null) {
+ addLocalContainerIdsToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SignalContainersRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addLocalContainerIdsToProto() {
+ maybeInitBuilder();
+ builder.clearContainerId();
+ if (this.containerIds == null)
+ return;
+ List protoList = new ArrayList();
+ for (ContainerId id : containerIds) {
+ protoList.add(convertToProtoFormat(id));
+ }
+ builder.addAllContainerId(protoList);
+ }
+
+ private void initLocalContainerIds() {
+ if (this.containerIds != null) {
+ return;
+ }
+ SignalContainersRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List containerIds = p.getContainerIdList();
+ this.containerIds = new ArrayList();
+ for (ContainerIdProto id : containerIds) {
+ this.containerIds.add(convertFromProtoFormat(id));
+ }
+ }
+
+ @Override
+ public List getContainerIds() {
+ initLocalContainerIds();
+ return this.containerIds;
+ }
+
+ @Override
+ public void setContainerIds(List containerIds) {
+ maybeInitBuilder();
+ if (containerIds == null)
+ builder.clearContainerId();
+ this.containerIds = containerIds;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl) t).getProto();
+ }
+
+ @Override
+ public int getSignal() {
+ SignalContainersRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.signal != null) {
+ return this.signal;
+ }
+ this.signal = p.getSignal();
+ return this.signal;
+ }
+
+ @Override
+ public void setSignal(int signal) {
+ maybeInitBuilder();
+ this.signal = signal;
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersResponsePBImpl.java
new file mode 100644
index 0000000..40d88d3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainersResponsePBImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerExceptionMapProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainersResponseProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class SignalContainersResponsePBImpl extends
+ SignalContainersResponse {
+ SignalContainersResponseProto proto = SignalContainersResponseProto
+ .getDefaultInstance();
+ SignalContainersResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private Map failedRequests = null;
+
+ public SignalContainersResponsePBImpl() {
+ builder = SignalContainersResponseProto.newBuilder();
+ }
+
+ public SignalContainersResponsePBImpl(
+ SignalContainersResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SignalContainersResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.failedRequests != null) {
+ addFailedRequestsToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SignalContainersResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addFailedRequestsToProto() {
+ maybeInitBuilder();
+ builder.clearFailedRequests();
+ if (this.failedRequests == null)
+ return;
+ List protoList =
+ new ArrayList();
+ for (Map.Entry entry : this.failedRequests
+ .entrySet()) {
+ protoList.add(ContainerExceptionMapProto.newBuilder()
+ .setContainerId(convertToProtoFormat(entry.getKey()))
+ .setException(convertToProtoFormat(entry.getValue())).build());
+ }
+ builder.addAllFailedRequests(protoList);
+ }
+
+ private void initFailedRequests() {
+ if (this.failedRequests != null) {
+ return;
+ }
+ SignalContainersResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List protoList = p.getFailedRequestsList();
+ this.failedRequests = new HashMap();
+ for (ContainerExceptionMapProto ce : protoList) {
+ this.failedRequests.put(convertFromProtoFormat(ce.getContainerId()),
+ convertFromProtoFormat(ce.getException()));
+ }
+ }
+
+ @Override
+ public Map getFailedRequests() {
+ initFailedRequests();
+ return this.failedRequests;
+ }
+
+ @Override
+ public void setFailedRequests(
+ Map failedRequests) {
+ maybeInitBuilder();
+ if (failedRequests == null)
+ builder.clearFailedRequests();
+ this.failedRequests = failedRequests;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl) t).getProto();
+ }
+
+ private SerializedExceptionPBImpl convertFromProtoFormat(
+ SerializedExceptionProto p) {
+ return new SerializedExceptionPBImpl(p);
+ }
+
+ private SerializedExceptionProto convertToProtoFormat(SerializedException t) {
+ return ((SerializedExceptionPBImpl) t).getProto();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index 8fe5c3c..0b9a15d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -35,6 +35,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -165,5 +167,11 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesResponse.newInstance(list, null);
return null;
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 76384d3..4123f6a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -38,6 +38,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -218,6 +220,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request)
new Exception(EXCEPTION_CAUSE));
throw new YarnException(e);
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 3d3aefd..f5845c9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -164,6 +164,14 @@ public int getValue() {
public String toString() {
return str;
}
+ public static Signal valueOf(int signal_num)
+ throws IllegalArgumentException {
+ for (Signal s : Signal.values()) {
+ if (s.value == signal_num)
+ return s;
+ }
+ throw new IllegalArgumentException();
+ }
}
protected void logOutput(String output) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 0af4332..6fc2959 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -53,6 +53,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -82,6 +84,7 @@
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
@@ -99,6 +102,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
@@ -762,4 +766,50 @@ public Context getContext() {
public Map getAuxServiceMetaData() {
return this.auxiliaryServices.getMetaData();
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ Map failedRequests =
+ new HashMap();
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+ for (ContainerId id : request.getContainerIds()) {
+ try {
+ signalContainerInternal(request.getSignal(), id, identifier);
+ } catch (YarnException e) {
+ failedRequests.put(id, SerializedException.newInstance(e));
+ }
+ }
+ return SignalContainersResponse.newInstance(failedRequests);
+ }
+
+ private void signalContainerInternal(int signal_num, ContainerId containerID,
+ NMTokenIdentifier nmTokenIdentifier) throws YarnException {
+ String containerIDStr = containerID.toString();
+ Container container = this.context.getContainers().get(containerID);
+
+ Signal signal;
+ try {
+ signal = Signal.valueOf(signal_num);
+ } catch (IllegalArgumentException e) {
+ throw new YarnException("Non supported signal " + signal_num);
+ }
+
+ LOG.info("Sending signal " + signal + " to container " + containerIDStr);
+ authorizeGetAndStopContainerRequest(containerID, container, false,
+ nmTokenIdentifier);
+
+ if (container == null) {
+ if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " was recently stopped on node manager.");
+ } else {
+ throw RPCUtil.getRemoteException("Container " + containerIDStr
+ + " is not handled by this NodeManager");
+ }
+ }
+ this.containersLauncher.handle(
+ new SignalContainersLauncherEvent(container, signal));
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index edc3146..646fd82 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -403,6 +403,66 @@ public void cleanupContainer() throws IOException {
}
/**
+ * Send a signal to the container.
+ *
+ * Acceptable signals: {@link Signal#QUIT}
+ *
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void signalContainer(Signal signal) throws IOException {
+ ContainerId containerId = container.getContainerId();
+ String containerIdStr = ConverterUtils.toString(containerId);
+ String user = container.getUser();
+
+ if (signal != Signal.QUIT) {
+ LOG.debug("Not sending non supported signal " + signal
+ + " as user " + user
+ + " for container " + containerIdStr);
+ return;
+ }
+
+ LOG.info("Sending signal " + signal + " to container " + containerIdStr);
+
+ boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
+ if (!alreadyLaunched) {
+ LOG.info("Container " + containerIdStr + " not launched."
+ + " Not sending the signal");
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting pid for container " + containerIdStr
+ + " to send signal to from pid file "
+ + (pidFilePath != null ? pidFilePath.toString() : "null"));
+ }
+
+ try {
+
+ // get process id from pid file if available
+ // else if shell is still active, get it from the shell
+ String processId = null;
+ if (pidFilePath != null) {
+ processId = getContainerPid(pidFilePath);
+ }
+
+ if (processId != null) {
+ LOG.debug("Sending signal " + signal + " to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr);
+ exec.signalContainer(user, processId, signal);
+ }
+ } catch (Exception e) {
+ String message =
+ "Exception when sending signal to container " + containerIdStr
+ + ": " + StringUtils.stringifyException(e);
+ LOG.warn(message);
+ dispatcher.getEventHandler().handle(
+ new ContainerDiagnosticsUpdateEvent(containerId, message));
+ }
+ }
+
+ /**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index ce865e3..9c53366 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -141,6 +141,21 @@ public void handle(ContainersLauncherEvent event) {
+ ". Ignoring.");
}
break;
+ case SIGNAL_CONTAINER:
+ SignalContainersLauncherEvent signalEvent =
+ (SignalContainersLauncherEvent) event;
+ launcher = running.get(containerId);
+ if (launcher == null) {
+ // Container not launched. So nothing needs to be done.
+ return;
+ }
+ try {
+ launcher.signalContainer(signalEvent.getSignal());
+ } catch (IOException e) {
+ LOG.warn("Got exception while cleaning container " + containerId
+ + ". Ignoring.");
+ }
+ break;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 6793bf7..92e1fcc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -21,4 +21,5 @@
public enum ContainersLauncherEventType {
LAUNCH_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
+ SIGNAL_CONTAINER,
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
new file mode 100644
index 0000000..6848533
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+public class SignalContainersLauncherEvent
+ extends ContainersLauncherEvent {
+
+ private final Signal signal;
+
+ public SignalContainersLauncherEvent(Container container, Signal signal) {
+ super(container, ContainersLauncherEventType.SIGNAL_CONTAINER);
+ this.signal = signal;
+ }
+
+ public Signal getSignal() {
+ return signal;
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index cc9b7d9..0180a8d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -97,6 +97,18 @@ public void testContainerManagerInitialization() throws IOException {
}
@Override
+ public void testSignalContainer() throws IOException, InterruptedException,
+ YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testSignalContainer");
+ super.testSignalContainer();
+ }
+
+ @Override
public void testContainerLaunchAndStop() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index b02054c..4baee5a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -261,21 +261,26 @@ public static void waitForContainerState(ContainerManagementProtocol containerMa
GetContainerStatusesRequest request =
GetContainerStatusesRequest.newInstance(list);
ContainerStatus containerStatus =
- containerManager.getContainerStatuses(request).getContainerStatuses()
- .get(0);
+ getContainerStatus(containerManager, request);
int timeoutSecs = 0;
while (!containerStatus.getState().equals(finalState)
&& timeoutSecs++ < timeOutMax) {
Thread.sleep(1000);
LOG.info("Waiting for container to get into state " + finalState
+ ". Current state is " + containerStatus.getState());
- containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
+ containerStatus = getContainerStatus(containerManager, request);
}
LOG.info("Container state is " + containerStatus.getState());
Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, containerStatus.getState());
}
+ public static ContainerStatus getContainerStatus(
+ ContainerManagementProtocol containerManager,
+ GetContainerStatusesRequest request) throws YarnException, IOException {
+ return containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
+ }
+
static void waitForApplicationState(ContainerManagerImpl containerManager,
ApplicationId appID, ApplicationState finalState)
throws InterruptedException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/SleepTool.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/SleepTool.java
new file mode 100644
index 0000000..ce3506a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/SleepTool.java
@@ -0,0 +1,32 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+/**
+ * A public class with the main method with just sleeps for 100 seconds.
+ *
+ * Used as a yarn container stub in {@link TestContainerManager#testSignalContainer()}
+ */
+public class SleepTool {
+
+ public static void main(String[] args) throws Exception {
+ Thread.sleep(100*1000);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index e5b318e..24f12e2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -26,9 +26,11 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import junit.framework.Assert;
@@ -40,6 +42,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -66,6 +69,7 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
@@ -79,6 +83,9 @@
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
public class TestContainerManager extends BaseContainerManagerTest {
public TestContainerManager() throws UnsupportedFileSystemException {
@@ -245,92 +252,34 @@ public void testContainerSetup() throws Exception {
// Now verify the contents of the file
BufferedReader reader = new BufferedReader(new FileReader(targetFile));
- Assert.assertEquals("Hello World!", reader.readLine());
- Assert.assertEquals(null, reader.readLine());
+ try {
+ Assert.assertEquals("Hello World!", reader.readLine());
+ Assert.assertEquals(null, reader.readLine());
+ } finally {
+ reader.close();
+ }
}
@Test
public void testContainerLaunchAndStop() throws IOException,
InterruptedException, YarnException {
- containerManager.start();
-
- File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
- PrintWriter fileWriter = new PrintWriter(scriptFile);
- File processStartFile =
- new File(tmpDir, "start_file.txt").getAbsoluteFile();
-
- // ////// Construct the Container-id
+ // Construct the Container-id
ContainerId cId = createContainerId(0);
-
- if (Shell.WINDOWS) {
- fileWriter.println("@echo Hello World!> " + processStartFile);
- fileWriter.println("@echo " + cId + ">> " + processStartFile);
- fileWriter.println("@ping -n 100 127.0.0.1 >nul");
- } else {
- fileWriter.write("\numask 0"); // So that start file is readable by the test
- fileWriter.write("\necho Hello World! > " + processStartFile);
- fileWriter.write("\necho $$ >> " + processStartFile);
- fileWriter.write("\nexec sleep 100");
- }
- fileWriter.close();
-
- ContainerLaunchContext containerLaunchContext =
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
- URL resource_alpha =
- ConverterUtils.getYarnUrlFromPath(localFS
- .makeQualified(new Path(scriptFile.getAbsolutePath())));
- LocalResource rsrc_alpha =
- recordFactory.newRecordInstance(LocalResource.class);
- rsrc_alpha.setResource(resource_alpha);
- rsrc_alpha.setSize(-1);
- rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
- rsrc_alpha.setType(LocalResourceType.FILE);
- rsrc_alpha.setTimestamp(scriptFile.lastModified());
- String destinationFile = "dest_file";
- Map localResources =
- new HashMap();
- localResources.put(destinationFile, rsrc_alpha);
- containerLaunchContext.setLocalResources(localResources);
- List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
- containerLaunchContext.setCommands(commands);
-
- StartContainerRequest scRequest =
- StartContainerRequest.newInstance(containerLaunchContext,
- createContainerToken(cId,
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
- context.getContainerTokenSecretManager()));
- List list = new ArrayList();
- list.add(scRequest);
- StartContainersRequest allRequests =
- StartContainersRequest.newInstance(list);
- containerManager.startContainers(allRequests);
-
- int timeoutSecs = 0;
- while (!processStartFile.exists() && timeoutSecs++ < 20) {
- Thread.sleep(1000);
- LOG.info("Waiting for process start-file to be created");
- }
- Assert.assertTrue("ProcessStartFile doesn't exist!",
- processStartFile.exists());
- // Now verify the contents of the file
- BufferedReader reader =
- new BufferedReader(new FileReader(processStartFile));
- Assert.assertEquals("Hello World!", reader.readLine());
- // Get the pid of the process
- String pid = reader.readLine().trim();
- // No more lines
- Assert.assertEquals(null, reader.readLine());
-
- // Now test the stop functionality.
-
- // Assert that the process is alive
- Assert.assertTrue("Process is not alive!",
- DefaultContainerExecutor.containerIsAlive(pid));
- // Once more
- Assert.assertTrue("Process is not alive!",
- DefaultContainerExecutor.containerIsAlive(pid));
+ ShellScriptResourcesProvider resourcesProvider = new ShellScriptResourcesProvider(cId) {
+
+ @Override
+ public void appendCommands(PrintWriter scriptWriter) {
+ if (Shell.WINDOWS) {
+ scriptWriter.println("@ping -n 100 127.0.0.1 >nul");
+ } else {
+ scriptWriter.write("\nexec sleep 100");
+ }
+ }
+
+ };
+ startContainer(cId, resourcesProvider);
+ String pid = resourcesProvider.waitForStartFile();
List containerIds = new ArrayList();
containerIds.add(cId);
@@ -352,67 +301,128 @@ public void testContainerLaunchAndStop() throws IOException,
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
}
-
- private void testContainerLaunchAndExit(int exitCode) throws IOException,
+
+ @Test
+ public void testSignalContainer() throws IOException,
InterruptedException, YarnException {
+ // Construct the Container-id
+ ContainerId cId = createContainerId(0);
- File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
- PrintWriter fileWriter = new PrintWriter(scriptFile);
- File processStartFile =
- new File(tmpDir, "start_file.txt").getAbsoluteFile();
-
- // ////// Construct the Container-id
- ContainerId cId = createContainerId(0);
-
- if (Shell.WINDOWS) {
- fileWriter.println("@echo Hello World!> " + processStartFile);
- fileWriter.println("@echo " + cId + ">> " + processStartFile);
- if (exitCode != 0) {
- fileWriter.println("@exit " + exitCode);
- }
- } else {
- fileWriter.write("\numask 0"); // So that start file is readable by the test
- fileWriter.write("\necho Hello World! > " + processStartFile);
- fileWriter.write("\necho $$ >> " + processStartFile);
- // Have script throw an exit code at the end
- if (exitCode != 0) {
- fileWriter.write("\nexit "+exitCode);
- }
- }
-
- fileWriter.close();
-
- ContainerLaunchContext containerLaunchContext =
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
- URL resource_alpha =
- ConverterUtils.getYarnUrlFromPath(localFS
- .makeQualified(new Path(scriptFile.getAbsolutePath())));
- LocalResource rsrc_alpha =
- recordFactory.newRecordInstance(LocalResource.class);
- rsrc_alpha.setResource(resource_alpha);
- rsrc_alpha.setSize(-1);
- rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
- rsrc_alpha.setType(LocalResourceType.FILE);
- rsrc_alpha.setTimestamp(scriptFile.lastModified());
- String destinationFile = "dest_file";
- Map localResources =
- new HashMap();
- localResources.put(destinationFile, rsrc_alpha);
- containerLaunchContext.setLocalResources(localResources);
- List commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
- containerLaunchContext.setCommands(commands);
+ final File logFile = new File(tmpDir, "testSignalContainer.out")
+ .getAbsoluteFile();
+
+ ShellScriptResourcesProvider resourcesProvider =
+ new ShellScriptResourcesProvider(cId) {
+
+ @Override
+ void appendCommands(PrintWriter scriptWriter) {
+ File testClassesDir = new File("target", "test-classes")
+ .getAbsoluteFile();
+ String cmd = "java -classpath " + testClassesDir + " "
+ + SleepTool.class.getCanonicalName()
+ + " >" + logFile + " 2>&1";
+ if (Shell.WINDOWS) {
+ scriptWriter.println(cmd);
+ } else {
+ scriptWriter.write("\nexec " + cmd);
+ }
+ }
+
+ };
+ startContainer(cId, resourcesProvider);
+ resourcesProvider.waitForStartFile();
+
+ List containerIds = new ArrayList();
+ containerIds.add(cId);
+ SignalContainersRequest signalRequest =
+ SignalContainersRequest.newInstance(containerIds, Signal.QUIT.getValue());
+ containerManager.signalContainers(signalRequest);
+
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+
+ Thread.sleep(100);
+ ContainerStatus containerStatus =
+ BaseContainerManagerTest.getContainerStatus(containerManager, gcsRequest);
+ Assert.assertEquals("Container state changed", ContainerState.RUNNING,
+ containerStatus.getState());
+
+ String containerLog = Files.toString(logFile, Charsets.US_ASCII);
+ System.out.println(containerLog);
+ Assert.assertTrue(
+ Pattern.compile("Full thread dump").matcher(containerLog).find());
+
+ StopContainersRequest stopRequest =
+ StopContainersRequest.newInstance(containerIds);
+ containerManager.stopContainers(stopRequest);
+ BaseContainerManagerTest.waitForContainerState(containerManager, cId,
+ ContainerState.COMPLETE);
+ }
+
+ private void startContainer(ContainerId cId, LocalResourcesProvider resourcesProvider)
+ throws IOException, YarnException, InterruptedException {
+ containerManager.start();
+
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ Map localResources =
+ new HashMap();
+
+ for (Map.Entry e : resourcesProvider.getLocalResources().entrySet()) {
+ String destinationFile = e.getKey();
+ File localResource = e.getValue();
+
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(localResource.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(localResource.lastModified());
+ localResources.put(destinationFile, rsrc_alpha);
+ }
+ containerLaunchContext.setLocalResources(localResources);
+
+ List commands = Arrays.asList(
+ Shell.getRunScriptCommand(resourcesProvider.getScriptFile()));
+ containerLaunchContext.setCommands(commands);
StartContainerRequest scRequest =
- StartContainerRequest.newInstance(
- containerLaunchContext,
- createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
- user, context.getContainerTokenSecretManager()));
+ StartContainerRequest.newInstance(containerLaunchContext,
+ createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
List list = new ArrayList();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
+ }
+
+ private void testContainerLaunchAndExit(final int exitCode) throws IOException,
+ InterruptedException, YarnException {
+ // Construct the Container-id
+ ContainerId cId = createContainerId(0);
+
+ LocalResourcesProvider resourcesProvider =
+ new ShellScriptResourcesProvider(cId) {
+
+ @Override
+ void appendCommands(PrintWriter scriptWriter) {
+ if (exitCode != 0) {
+ if (Shell.WINDOWS) {
+ scriptWriter.println("@exit " + exitCode);
+ } else {
+ scriptWriter.write("\nexit "+exitCode);
+ }
+ }
+ }
+ };
+ startContainer(cId, resourcesProvider);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
@@ -761,4 +771,80 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenIdentifier);
return containerToken;
}
+
+ private static interface LocalResourcesProvider {
+ Map getLocalResources() throws IOException;
+ File getScriptFile() throws IOException;
+ }
+
+ private static abstract class ShellScriptResourcesProvider
+ implements LocalResourcesProvider {
+
+ final File scriptFile;
+ final File processStartFile;
+
+ public ShellScriptResourcesProvider(ContainerId cId) throws IOException {
+ this.scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+ PrintWriter scriptWriter = new PrintWriter(scriptFile);
+ this.processStartFile =
+ new File(tmpDir, "start_file.txt").getAbsoluteFile();
+
+ if (Shell.WINDOWS) {
+ scriptWriter.println("@echo Hello World!> " + processStartFile);
+ scriptWriter.println("@echo " + cId + ">> " + processStartFile);
+ } else {
+ scriptWriter.write("\numask 0"); // So that start file is readable by the test
+ scriptWriter.write("\necho Hello World! > " + processStartFile);
+ scriptWriter.write("\necho $$ >> " + processStartFile);
+ }
+ appendCommands(scriptWriter);
+ scriptWriter.close();
+ }
+
+ abstract void appendCommands(PrintWriter scriptWriter);
+
+ @Override
+ public Map getLocalResources() throws IOException {
+ return Collections.singletonMap("dest_file", scriptFile);
+ }
+
+ @Override
+ public File getScriptFile() throws IOException {
+ return scriptFile;
+ }
+
+ public String waitForStartFile() throws IOException, InterruptedException {
+ int timeoutSecs = 0;
+ while (!processStartFile.exists() && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for process start-file to be created");
+ }
+ Assert.assertTrue("ProcessStartFile doesn't exist!",
+ processStartFile.exists());
+
+ // Now verify the contents of the file
+ BufferedReader reader =
+ new BufferedReader(new FileReader(processStartFile));
+ try {
+ Assert.assertEquals("Hello World!", reader.readLine());
+ // Get the pid of the process
+ String pid = reader.readLine().trim();
+ // No more lines
+ Assert.assertEquals(null, reader.readLine());
+
+ // Assert that the process is alive
+ Assert.assertTrue("Process is not alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+ // Once more
+ Assert.assertTrue("Process is not alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+
+ return pid;
+ } finally {
+ reader.close();
+ }
+ }
+
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
index 7f4d3f0..d8e8273 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -33,6 +34,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -45,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -314,4 +318,11 @@ synchronized public GetContainerStatusesResponse getContainerStatuses(
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ return SignalContainersResponse.newInstance(
+ Collections.emptyMap());
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
index a9f1c1a..40ef848 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
@@ -43,6 +43,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@@ -129,6 +131,12 @@ public Credentials getContainerCredentials() throws IOException {
credentials.readTokenStorageStream(buf);
return credentials;
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ return SignalContainersResponse.newInstance(null);
+ }
}
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 64e5cc9..6c77e74 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -118,6 +120,12 @@ public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws YarnException {
return null;
}
+
+ @Override
+ public SignalContainersResponse signalContainers(
+ SignalContainersRequest request) throws YarnException, IOException {
+ return null;
+ }
}
@Test