diff --git BUILDING.txt BUILDING.txt
index 621a221..1a5f7c8 100644
--- BUILDING.txt
+++ BUILDING.txt
@@ -222,18 +222,22 @@ Requirements:
* Findbugs 1.3.9 (if running findbugs)
* ProtocolBuffer 2.5.0
* CMake 2.6 or newer
-* Windows SDK or Visual Studio 2010 Professional
+* Windows SDK 7.1 or Visual Studio 2010 Professional
* Unix command-line tools from GnuWin32 or Cygwin: sh, mkdir, rm, cp, tar, gzip
* zlib headers (if building native code bindings for zlib)
* Internet connection for first build (to fetch all Maven and Hadoop dependencies)
If using Visual Studio, it must be Visual Studio 2010 Professional (not 2012).
Do not use Visual Studio Express. It does not support compiling for 64-bit,
-which is problematic if running a 64-bit system. The Windows SDK is free to
+which is problematic if running a 64-bit system. The Windows SDK 7.1 is free to
download here:
http://www.microsoft.com/en-us/download/details.aspx?id=8279
+The Windows SDK 8.1 is available to download at:
+
+http://msdn.microsoft.com/en-us/windows/bg162891.aspx
+
----------------------------------------------------------------------------------
Building:
diff --git hadoop-common-project/hadoop-common/src/main/winutils/task.c hadoop-common-project/hadoop-common/src/main/winutils/task.c
index bfdbd63..f83ded2 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/task.c
+++ hadoop-common-project/hadoop-common/src/main/winutils/task.c
@@ -43,13 +43,37 @@ typedef enum TaskCommandOptionType
{
TaskInvalid,
TaskCreate,
- TaskCreateAsUser,
TaskIsAlive,
TaskKill,
TaskProcessList
} TaskCommandOption;
//----------------------------------------------------------------------------
+// Function: GetLimit
+//
+// Description:
+// Get the resource limit value in long type given the command line argument.
+//
+// Returns:
+// TRUE: If successfully get the value
+// FALSE: otherwise
+static BOOL GetLimit(__in const wchar_t *str, __out long *value)
+{
+ wchar_t *end = NULL;
+ if (str == NULL || value == NULL) return FALSE;
+ *value = wcstol(str, &end, 10);
+ if (end == NULL || *end != '\0')
+ {
+ *value = -1;
+ return FALSE;
+ }
+ else
+ {
+ return TRUE;
+ }
+}
+
+//----------------------------------------------------------------------------
// Function: ParseCommandLine
//
// Description:
@@ -61,7 +85,11 @@ typedef enum TaskCommandOptionType
// FALSE: otherwise
static BOOL ParseCommandLine(__in int argc,
__in_ecount(argc) wchar_t *argv[],
- __out TaskCommandOption *command)
+ __out TaskCommandOption *command,
+ __out_opt long *memory,
+ __out_opt long *vcore,
+ __out_opt PCWSTR *user,
+ __out_opt PCWSTR *pidFilePath)
{
*command = TaskInvalid;
@@ -88,22 +116,60 @@ static BOOL ParseCommandLine(__in int argc,
}
}
- if (argc == 4) {
+ if (argc >= 4) {
if (wcscmp(argv[1], L"create") == 0)
{
+ int i;
+ for (i = 2; i < argc - 3; i++)
+ {
+ if (wcscmp(argv[i], L"-c") == 0)
+ {
+ if (vcore != NULL && !GetLimit(argv[i + 1], vcore))
+ {
+ return FALSE;
+ }
+ else
+ {
+ i++;
+ continue;
+ }
+ }
+ else if (wcscmp(argv[i], L"-m") == 0)
+ {
+ if (memory != NULL && !GetLimit(argv[i + 1], memory))
+ {
+ return FALSE;
+ }
+ else
+ {
+ i++;
+ continue;
+ }
+ }
+ else if (wcscmp(argv[i], L"-u") == 0)
+ {
+ if (user == NULL) return FALSE;
+ *user = argv[++i];
+ continue;
+ }
+ else if (wcscmp(argv[i], L"-p") == 0)
+ {
+ if (pidFilePath == NULL) return FALSE;
+ *pidFilePath = argv[++i];
+ continue;
+ }
+ else
+ {
+ break;
+ }
+ }
+ if (argc - i != 2)
+ return FALSE;
*command = TaskCreate;
return TRUE;
}
}
- if (argc >= 6) {
- if (wcscmp(argv[1], L"createAsUser") == 0)
- {
- *command = TaskCreateAsUser;
- return TRUE;
- }
- }
-
return FALSE;
}
@@ -564,8 +630,8 @@ done:
// Function: CreateTaskImpl
//
// Description:
-// Creates a task via a jobobject. Outputs the
-// appropriate information to stdout on success, or stderr on failure.
+// Creates a task via a jobobject, optionally setting CPU and memory limits.
+// Outputs the appropriate information to stdout on success, or stderr on failure.
// logonHandle may be NULL, in this case the current logon will be utilized for the
// created process
//
@@ -573,7 +639,7 @@ done:
// ERROR_SUCCESS: On success
// GetLastError: otherwise
DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PCWSTR cmdLine,
- __in LPCWSTR userName)
+ __in LPCWSTR userName, __in long memory, __in long cpuRate)
{
DWORD dwErrorCode = ERROR_SUCCESS;
DWORD exitCode = EXIT_FAILURE;
@@ -616,6 +682,12 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC
return dwErrorCode;
}
jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
+ if (memory > 0)
+ {
+ jeli.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
+ jeli.ProcessMemoryLimit = memory * 1024 * 1024;
+ jeli.JobMemoryLimit = memory * 1024 * 1024;
+ }
if(SetInformationJobObject(jobObject,
JobObjectExtendedLimitInformation,
&jeli,
@@ -626,6 +698,24 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC
CloseHandle(jobObject);
return dwErrorCode;
}
+#ifdef NTDDI_WIN8
+ if (cpuRate > 0)
+ {
+ JOBOBJECT_CPU_RATE_CONTROL_INFORMATION jcrci = { 0 };
+ SYSTEM_INFO sysinfo;
+ GetSystemInfo(&sysinfo);
+ jcrci.ControlFlags = JOB_OBJECT_CPU_RATE_CONTROL_ENABLE |
+ JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
+ jcrci.CpuRate = min(10000, cpuRate);
+ if(SetInformationJobObject(jobObject, JobObjectCpuRateControlInformation,
+ &jcrci, sizeof(jcrci)) == 0)
+ {
+ dwErrorCode = GetLastError();
+ CloseHandle(jobObject);
+ return dwErrorCode;
+ }
+ }
+#endif
if (logonHandle != NULL) {
dwErrorCode = AddNodeManagerAndUserACEsToObject(jobObject, userName, JOB_OBJECT_ALL_ACCESS);
@@ -809,10 +899,10 @@ create_process_done:
// Returns:
// ERROR_SUCCESS: On success
// GetLastError: otherwise
-DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine)
+DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine, __in long memory, __in long cpuRate)
{
// call with null logon in order to create tasks utilizing the current logon
- return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL);
+ return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL, memory, cpuRate);
}
//----------------------------------------------------------------------------
@@ -826,7 +916,8 @@ DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine)
// ERROR_SUCCESS: On success
// GetLastError: otherwise
DWORD CreateTaskAsUser(__in PCWSTR jobObjName,
- __in PCWSTR user, __in PCWSTR pidFilePath, __in PCWSTR cmdLine)
+ __in PCWSTR user, __in PCWSTR pidFilePath, __in PCWSTR cmdLine,
+ __in long memory, __in long cpuRate)
{
DWORD err = ERROR_SUCCESS;
DWORD exitCode = EXIT_FAILURE;
@@ -893,7 +984,7 @@ DWORD CreateTaskAsUser(__in PCWSTR jobObjName,
goto done;
}
- err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user);
+ err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user, memory, cpuRate);
done:
if( profileIsLoaded ) {
@@ -1095,23 +1186,12 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
{
DWORD dwErrorCode = ERROR_SUCCESS;
TaskCommandOption command = TaskInvalid;
- wchar_t* cmdLine = NULL;
- wchar_t buffer[16*1024] = L""; // 32K max command line
- size_t charCountBufferLeft = sizeof(buffer)/sizeof(wchar_t);
- int crtArgIndex = 0;
- size_t argLen = 0;
- size_t wscatErr = 0;
- wchar_t* insertHere = NULL;
+ long memory = -1;
+ long cpuRate = -1;
+ PCWSTR user = NULL;
+ PCWSTR pidFilePath = NULL;
- enum {
- ARGC_JOBOBJECTNAME = 2,
- ARGC_USERNAME,
- ARGC_PIDFILE,
- ARGC_COMMAND,
- ARGC_COMMAND_ARGS
- };
-
- if (!ParseCommandLine(argc, argv, &command)) {
+ if (!ParseCommandLine(argc, argv, &command, &memory, &cpuRate, &user, &pidFilePath)) {
dwErrorCode = ERROR_INVALID_COMMAND_LINE;
fwprintf(stderr, L"Incorrect command line arguments.\n\n");
@@ -1123,60 +1203,27 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
{
// Create the task jobobject
//
- dwErrorCode = CreateTask(argv[2], argv[3]);
- if (dwErrorCode != ERROR_SUCCESS)
+ if (user != NULL && pidFilePath != NULL)
{
- ReportErrorCode(L"CreateTask", dwErrorCode);
- goto TaskExit;
+ dwErrorCode = CreateTaskAsUser(argv[argc - 2], user, pidFilePath,
+ argv[argc - 1], memory, cpuRate);
+ if (dwErrorCode != ERROR_SUCCESS)
+ {
+ ReportErrorCode(L"CreateTaskAsUser", dwErrorCode);
+ goto TaskExit;
+ }
}
- } else if (command == TaskCreateAsUser)
- {
- // Create the task jobobject as a domain user
- // createAsUser accepts an open list of arguments. All arguments after the command are
- // to be passed as argumrnts to the command itself.Here we're concatenating all
- // arguments after the command into a single arg entry.
- //
- cmdLine = argv[ARGC_COMMAND];
- if (argc > ARGC_COMMAND_ARGS) {
- crtArgIndex = ARGC_COMMAND;
- insertHere = buffer;
- while (crtArgIndex < argc) {
- argLen = wcslen(argv[crtArgIndex]);
- wscatErr = wcscat_s(insertHere, charCountBufferLeft, argv[crtArgIndex]);
- switch (wscatErr) {
- case 0:
- // 0 means success;
- break;
- case EINVAL:
- dwErrorCode = ERROR_INVALID_PARAMETER;
- goto TaskExit;
- case ERANGE:
- dwErrorCode = ERROR_INSUFFICIENT_BUFFER;
- goto TaskExit;
- default:
- // This case is not MSDN documented.
- dwErrorCode = ERROR_GEN_FAILURE;
- goto TaskExit;
- }
- insertHere += argLen;
- charCountBufferLeft -= argLen;
- insertHere[0] = L' ';
- insertHere += 1;
- charCountBufferLeft -= 1;
- insertHere[0] = 0;
- ++crtArgIndex;
- }
- cmdLine = buffer;
- }
-
- dwErrorCode = CreateTaskAsUser(
- argv[ARGC_JOBOBJECTNAME], argv[ARGC_USERNAME], argv[ARGC_PIDFILE], cmdLine);
- if (dwErrorCode != ERROR_SUCCESS)
+ else
{
- ReportErrorCode(L"CreateTaskAsUser", dwErrorCode);
- goto TaskExit;
+ dwErrorCode = CreateTask(argv[argc - 2], argv[argc - 1], memory, cpuRate);
+ if (dwErrorCode != ERROR_SUCCESS)
+ {
+ ReportErrorCode(L"CreateTask", dwErrorCode);
+ goto TaskExit;
+ }
}
- } else if (command == TaskIsAlive)
+ }
+ else if (command == TaskIsAlive)
{
// Check if task jobobject
//
@@ -1238,18 +1285,31 @@ void TaskUsage()
// jobobject's are being used.
// ProcessTree.isSetsidSupported()
fwprintf(stdout, L"\
- Usage: task create [TASKNAME] [COMMAND_LINE] |\n\
- task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE] |\n\
- task isAlive [TASKNAME] |\n\
- task kill [TASKNAME]\n\
- task processList [TASKNAME]\n\
- Creates a new task jobobject with taskname\n\
- Creates a new task jobobject with taskname as the user provided\n\
- Checks if task jobobject is alive\n\
- Kills task jobobject\n\
- Prints to stdout a list of processes in the task\n\
- along with their resource usage. One process per line\n\
- and comma separated info per process\n\
- ProcessId,VirtualMemoryCommitted(bytes),\n\
- WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
+Usage: task create [OPTOINS] [TASKNAME] [COMMAND_LINE]\n\
+ Creates a new task job object with taskname and options to set CPU\n\
+ and memory limits on the job object\n\
+\n\
+ OPTIONS: -c [cup rate] set the cpu rate limit on the job object.\n\
+ -m [memory] set the memory limit on the job object.\n\
+ -u [user name] set the user of the task.\n\
+ -p [pid file] set the pid file of the task.\n\
+ The cpu limit is an integral value of percentage * 100. The memory\n\
+ limit is an integral number of memory in MB. \n\
+ The limit will not be set if 0 or negative value is passed in as\n\
+ parameter(s).\n\
+ The user name and pid file must be both set for the options to\n\
+ take effect; otherwise either setting will be ignored.\n\
+\n\
+ task isAlive [TASKNAME] |\n\
+ Checks if task job object is alive\n\
+\n\
+ task kill [TASKNAME]\n\
+ Kills task job object\n\
+\n\
+ task processList [TASKNAME]\n\
+ Prints to stdout a list of processes in the task\n\
+ along with their resource usage. One process per line\n\
+ and comma separated info per process\n\
+ ProcessId,VirtualMemoryCommitted(bytes),\n\
+ WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
}
diff --git hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props
new file mode 100644
index 0000000..503b37a
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+ $(VCInstallDir)bin\x86_amd64;$(VCInstallDir)bin;$(WindowsSdkDir)bin\NETFX 4.0 Tools;$(MSBuildProgramFiles32)\Windows Kits\8.1\bin\x86;$(VSInstallDir)Common7\Tools\bin;$(VSInstallDir)Common7\tools;$(VSInstallDir)Common7\ide;$(MSBuildProgramFiles32)\HTML Help Workshop;$(FrameworkSDKDir)\bin;$(MSBuildToolsPath32);$(VSInstallDir);$(SystemRoot)\SysWow64;$(FxCopDir);$(PATH)
+ $(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(FrameworkSDKDir)\include;
+ $(VCInstallDir)lib\amd64;$(VCInstallDir)atlmfc\lib\amd64;$(MSBuildProgramFiles32)\Windows Kits\8.1\lib\win8\um\x64;$(MSBuildProgramFiles32)\Windows Kits\8.1\Lib\winv6.3\um\x64;$(FrameworkSDKDir)\lib\x64
+ $(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(FrameworkSDKDir)\include;$(MSBuildToolsPath32);$(VCInstallDir)atlmfc\lib;$(VCInstallDir)lib;
+
+
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
index d736084..c4ed9b0 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
+++ hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
@@ -48,6 +48,9 @@
+
+
+
diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
index 953039d..cca9bb4 100644
--- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
+++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
@@ -27,6 +27,7 @@
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@@ -545,4 +546,62 @@ public void testTaskCreate() throws IOException {
assertThat(outNumber, containsString(testNumber));
}
+
+ @Test (timeout = 30000)
+ public void testTaskCreateWithLimits() throws IOException {
+ // Generate a unique job id
+ Random rand = new Random();
+ long id = rand.nextLong();
+
+ // Run a task without any limits
+ id = rand.nextLong();
+ String out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
+ "-1", "job" + id, "cmd /c echo job" + id);
+ assertTrue(out.trim().equals("job" + id));
+
+ // Run a task with limits (128MB should be enough for a cmd)
+ id = rand.nextLong();
+ out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "10000", "-m",
+ "128", "job" + id, "cmd /c echo job" + id);
+ assertTrue(out.trim().equals("job" + id));
+
+ // Run a task without enough memory
+ try {
+ id = rand.nextLong();
+ out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-m", "128", "job"
+ + id, "java -Xmx256m -version");
+ fail("Failed to get Shell.ExitCodeException with insufficient memory");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1));
+ }
+
+ // Run tasks with wrong parameters
+ //
+ try {
+ id = rand.nextLong();
+ Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
+ "-1", "foo", "job" + id, "cmd /c echo job" + id);
+ fail("Failed to get Shell.ExitCodeException with bad parameters");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1639));
+ }
+
+ try {
+ id = rand.nextLong();
+ Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-m", "-1",
+ "job" + id, "cmd /c echo job" + id);
+ fail("Failed to get Shell.ExitCodeException with bad parameters");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1639));
+ }
+
+ try {
+ id = rand.nextLong();
+ Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "foo",
+ "job" + id, "cmd /c echo job" + id);
+ fail("Failed to get Shell.ExitCodeException with bad parameters");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1639));
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1e7d544..8593719 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -892,8 +892,8 @@
Percentage of CPU that can be allocated
for containers. This setting allows users to limit the amount of
- CPU that YARN containers use. Currently functional only
- on Linux using cgroups. The default is to use 100% of CPU.
+ CPU that YARN containers use. Currently functional on Linux using cgroups
+ or Windows using job objects. The default is to use 100% of CPU.
yarn.nodemanager.resource.percentage-physical-cpu-limit
100
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 4ce1a75..eb1d953 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@@ -274,7 +275,12 @@ protected Path getPidFilePath(ContainerId containerId) {
readLock.unlock();
}
}
-
+
+ protected String[] getRunCommand(String command, String groupId,
+ String userName, Path pidFile, Configuration conf) {
+ return getRunCommand(command, groupId, userName, pidFile, conf, null);
+ }
+
/**
* Return a command to execute the given command in OS shell.
* On Windows, the passed in groupId can be used to launch
@@ -282,7 +288,7 @@ protected Path getPidFilePath(ContainerId containerId) {
* non-Windows, groupId is ignored.
*/
protected String[] getRunCommand(String command, String groupId,
- String userName, Path pidFile, Configuration conf) {
+ String userName, Path pidFile, Configuration conf, Resource resource) {
boolean containerSchedPriorityIsSet = false;
int containerSchedPriorityAdjustment =
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsContainerExecutor.java
new file mode 100644
index 0000000..3c7b779
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsContainerExecutor.java
@@ -0,0 +1,55 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+
+public class WindowsContainerExecutor extends DefaultContainerExecutor {
+
+ @Override
+ protected String[] getRunCommand(String command, String groupId,
+ String userName, Path pidFile, Configuration conf, Resource resource) {
+ if (!Shell.WINDOWS)
+ return null;
+
+ int cpuRate = -1;
+ int memory = -1;
+ if (resource != null) {
+ memory = resource.getMemory();
+
+ int vcores = resource.getVirtualCores();
+ // cap overall usage to the number of cores allocated to YARN
+ float yarnProcessors = NodeManagerHardwareUtils.getContainersCores(
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf),
+ conf);
+ // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
+ // should be set as 20 * 100. The following setting is equal to:
+ // 100 * (100 * (vcores / Total # of cores allocated to YARN))
+ cpuRate = Math.min(10000, (int) ((vcores * 10000) / yarnProcessors));
+ }
+
+ return new String[] { Shell.WINUTILS, "task", "create", "-m",
+ String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId,
+ "cmd /c " + command };
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
index ec9c65e..42c9d64 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
@@ -50,9 +50,12 @@
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/**
* Windows secure container executor (WSCE).
@@ -558,14 +561,30 @@ public void setConf(Configuration conf) {
@Override
protected String[] getRunCommand(String command, String groupId,
- String userName, Path pidFile, Configuration conf) {
+ String userName, Path pidFile, Configuration conf, Resource resource) {
File f = new File(command);
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("getRunCommand: %s exists:%b",
- command, f.exists()));
+ LOG.debug(String.format("getRunCommand: %s exists:%b", command,
+ f.exists()));
}
- return new String[] { Shell.WINUTILS, "task", "createAsUser", groupId,
- userName, pidFile.toString(), "cmd /c " + command };
+ int cpuRate = -1;
+ int memory = -1;
+ if (resource != null) {
+ memory = resource.getMemory();
+
+ int vcores = resource.getVirtualCores();
+ // cap overall usage to the number of cores allocated to YARN
+ float yarnProcessors = NodeManagerHardwareUtils.getContainersCores(
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf),
+ conf);
+ // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
+ // should be set as 20 * 100. The following setting is equal to:
+ // 100 * (100 * (vcores / Total # of cores allocated to YARN))
+ cpuRate = Math.min(10000, (int) ((vcores * 10000) / yarnProcessors));
+ }
+ return new String[] { Shell.WINUTILS, "task", "create", "-m",
+ String.valueOf(memory), "-c", String.valueOf(cpuRate), "-u", userName,
+ "-p", pidFile.toString(), groupId, "cmd /c " + command };
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestWindowsContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestWindowsContainerExecutor.java
new file mode 100644
index 0000000..44a5765
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestWindowsContainerExecutor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import static org.junit.Assume.assumeTrue;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWindowsContainerExecutor {
+ @Before
+ public void setUp() {
+ // Not supported on non-Windows platforms
+ assumeTrue(Shell.WINDOWS);
+ }
+
+ @Test
+ public void testRunCommandWithResources() {
+ Configuration conf = new Configuration();
+ ContainerExecutor wce = new WindowsContainerExecutor();
+ String[] command = wce.getRunCommand("echo", "group1", null, null,
+ conf, Resource.newInstance(1024, 1));
+ // Assert the cpu and memory limits are set correctly in the command
+ String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
+ "1", "group1", "cmd /c " + "echo" };
+ Assert.assertTrue(Arrays.equals(expected, command));
+ }
+}