diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index a15e1243b24..7e0b5ffbd06 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -56,6 +56,7 @@ stop-yarn.sh start-yarn.cmd stop-yarn.cmd + start-yarn-ucc.cmd 0755 diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/task.c b/hadoop-common-project/hadoop-common/src/main/winutils/task.c index 057fd8aa606..04a80d0f0d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/task.c +++ b/hadoop-common-project/hadoop-common/src/main/winutils/task.c @@ -30,6 +30,7 @@ #define NM_WSCE_IMPERSONATE_ALLOWED L"yarn.nodemanager.windows-secure-container-executor.impersonate.allowed" #define NM_WSCE_IMPERSONATE_DENIED L"yarn.nodemanager.windows-secure-container-executor.impersonate.denied" +#define NM_SBIN_UNMANAGED_CONTAINER_CHECKER L"sbin\\start-yarn-ucc.cmd" // The S4U impersonation access check mask. Arbitrary value (we use 1 for the service access check) #define SERVICE_IMPERSONATE_MASK 0x00000002 @@ -53,7 +54,7 @@ typedef enum TaskCommandOptionType } TaskCommandOption; //---------------------------------------------------------------------------- -// Function: GetLimit +// Function: GetLong // // Description: // Get the resource limit value in long type given the command line argument. @@ -61,7 +62,7 @@ typedef enum TaskCommandOptionType // Returns: // TRUE: If successfully get the value // FALSE: otherwise -static BOOL GetLimit(__in const wchar_t *str, __out long *value) +static BOOL GetLong(__in const wchar_t *str, __out long *value) { wchar_t *end = NULL; if (str == NULL || value == NULL) return FALSE; @@ -78,6 +79,41 @@ static BOOL GetLimit(__in const wchar_t *str, __out long *value) } //---------------------------------------------------------------------------- +// Function: GetUnmanagedContainerCheckerCmdline +// +// Description: +// Get the UnmanagedContainerChecker cmdline given the containerId. +// +// Returns: +// ERROR_SUCCESS: On success +// GetLastError: otherwise +static DWORD GetUnmanagedContainerCheckerCmdline(__in const wchar_t *containerId, + __out wchar_t *cmdLine, __in size_t cmdLineLen) +{ + DWORD errorCode = ERROR_SUCCESS; + HRESULT hr = S_OK; + + WCHAR hadoopHome[16 * 1024] = L""; // 32K max env var value + if (GetEnvironmentVariable(L"HADOOP_HOME", hadoopHome, sizeof(hadoopHome) / sizeof(WCHAR)) == 0) + { + errorCode = GetLastError(); + ReportErrorCode(L"GetEnvironmentVariable", errorCode); + return errorCode; + } + + hr = StringCchPrintf(cmdLine, cmdLineLen, + L"cmd /c %s\\%s %s", hadoopHome, NM_SBIN_UNMANAGED_CONTAINER_CHECKER, containerId); + if (FAILED(hr)) + { + errorCode = HRESULT_CODE(hr); + ReportErrorCode(L"StringCchPrintf", errorCode); + return errorCode; + } + + return errorCode; +} + +//---------------------------------------------------------------------------- // Function: ParseCommandLine // // Description: @@ -91,7 +127,8 @@ static BOOL ParseCommandLine(__in int argc, __in_ecount(argc) wchar_t *argv[], __out TaskCommandOption *command, __out_opt long *memory, - __out_opt long *vcore) + __out_opt long *vcore, + __out_opt BOOL *uccMode) { *command = TaskInvalid; @@ -118,7 +155,7 @@ static BOOL ParseCommandLine(__in int argc, } } - if (argc >= 4 && argc <= 8) { + if (argc >= 4 && argc <= 10) { if (wcscmp(argv[1], L"create") == 0) { int i; @@ -126,7 +163,7 @@ static BOOL ParseCommandLine(__in int argc, { if (wcscmp(argv[i], L"-c") == 0) { - if (vcore != NULL && !GetLimit(argv[i + 1], vcore)) + if (vcore != NULL && !GetLong(argv[i + 1], vcore)) { return FALSE; } @@ -138,7 +175,7 @@ static BOOL ParseCommandLine(__in int argc, } else if (wcscmp(argv[i], L"-m") == 0) { - if (memory != NULL && !GetLimit(argv[i + 1], memory)) + if (memory != NULL && !GetLong(argv[i + 1], memory)) { return FALSE; } @@ -148,6 +185,22 @@ static BOOL ParseCommandLine(__in int argc, continue; } } + else if (wcscmp(argv[i], L"-ucc") == 0) + { + if (argv[i + 1] == NULL) + { + return FALSE; + } + + *uccMode = FALSE; + if (wcscmp(argv[i + 1], L"1") == 0) + { + *uccMode = TRUE; + } + + i++; + continue; + } else { break; @@ -638,18 +691,23 @@ done: // ERROR_SUCCESS: On success // GetLastError: otherwise DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PWSTR cmdLine, - __in LPCWSTR userName, __in long memory, __in long cpuRate) + __in LPCWSTR userName, __in long memory, __in long cpuRate, __in BOOL enableUccMode) { DWORD dwErrorCode = ERROR_SUCCESS; DWORD exitCode = EXIT_FAILURE; DWORD currDirCnt = 0; STARTUPINFO si; PROCESS_INFORMATION pi; + STARTUPINFO uccSi; + PROCESS_INFORMATION uccPi; HANDLE jobObject = NULL; JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = { 0 }; void * envBlock = NULL; WCHAR secureJobNameBuffer[MAX_PATH]; LPCWSTR secureJobName = jobObjName; + WCHAR uccCmdLine[16 * 1024] = L""; // 32K max command line + HANDLE childProcessHandles[2]; + DWORD childProcessWaitResult; wchar_t* curr_dir = NULL; FILE *stream = NULL; @@ -796,8 +854,38 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PW &pi)) { // process info dwErrorCode = GetLastError(); ReportErrorCode(L"CreateProcess", dwErrorCode); + goto create_process_done; + } + + if (enableUccMode) { + // Create UnmanagedContainerChecker process + dwErrorCode = GetUnmanagedContainerCheckerCmdline(jobObjName, uccCmdLine, sizeof(uccCmdLine) / sizeof(WCHAR)); + if (dwErrorCode != ERROR_SUCCESS) { + ReportErrorCode(L"GetUnmanagedContainerCheckerCmdline", dwErrorCode); + goto create_process_done; } + ZeroMemory(&uccSi, sizeof(uccSi)); + uccSi.cb = sizeof(uccSi); + ZeroMemory(&uccPi, sizeof(uccPi)); + + if (!CreateProcess( + NULL, // ApplicationName + uccCmdLine, // command line + NULL, // process security attributes + NULL, // thread security attributes + TRUE, // inherit handles + 0, // creation flags + NULL, // environment + curr_dir, // current directory + &uccSi, // startup info + &uccPi)) { // process info + dwErrorCode = GetLastError(); + ReportErrorCode(L"UnmanagedContainerChecker CreateProcess", dwErrorCode); + goto create_process_done; + } + } + // task create (w/o createAsUser) does not need the ACEs change on the process goto create_process_done; } @@ -855,12 +943,62 @@ create_process_done: CloseHandle(pi.hThread); - // Wait until child process exits. - WaitForSingleObject( pi.hProcess, INFINITE ); - if(GetExitCodeProcess(pi.hProcess, &exitCode) == 0) - { - dwErrorCode = GetLastError(); + if (enableUccMode) { + CloseHandle(uccPi.hThread); + + childProcessHandles[0] = pi.hProcess; + childProcessHandles[1] = uccPi.hProcess; + + // Wait until either task process or UnmanagedContainerChecker process exits. + childProcessWaitResult = WaitForMultipleObjects( + sizeof(childProcessHandles) / sizeof(HANDLE), + childProcessHandles, FALSE, INFINITE); + + if (WAIT_OBJECT_0 == childProcessWaitResult) { + if (GetExitCodeProcess(pi.hProcess, &exitCode) == 0) { + dwErrorCode = GetLastError(); + ReportErrorCode(L"GetExitCodeProcess", dwErrorCode); + } + } + else if (WAIT_OBJECT_0 + 1 == childProcessWaitResult) { + // UnmanagedContainerChecker process exits if and only if the container of the + // corresponding containerId is unmanaged by local NM. + // So, exit current process to cleanup the associated job object of the + // unmanaged container. + if (GetExitCodeProcess(uccPi.hProcess, &exitCode) == 0) { + dwErrorCode = GetLastError(); + ReportErrorCode(L"UnmanagedContainerChecker GetExitCodeProcess", dwErrorCode); + } + else { + fwprintf(stderr, L"UnmanagedContainerChecker exits with exitcode: %lu. " + L"Container is unmanaged by YARN now. " + L"Will kill the container job object to cleanup all the processes of the container.", + exitCode); + fflush(stderr); // in case stderr reopened + } + + // The current process's exit code is aligned with ContainerExecutor.ExitCode.LOST(154), + // which is used to recover container but cannot found its job object and exit code file. + // Note, since the container is already unmanaged by YARN, so generally the exitcode will + // not be awared by YARN applications. + exitCode = 154; + } + else { + dwErrorCode = GetLastError(); + ReportErrorCode(L"WaitForMultipleObjects", dwErrorCode); + } + + CloseHandle(uccPi.hProcess); } + else { + // Wait until task process exits. + WaitForSingleObject(pi.hProcess, INFINITE); + if (GetExitCodeProcess(pi.hProcess, &exitCode) == 0) { + dwErrorCode = GetLastError(); + ReportErrorCode(L"GetExitCodeProcess", dwErrorCode); + } + } + CloseHandle( pi.hProcess ); if( envBlock != NULL ) { @@ -898,10 +1036,11 @@ create_process_done: // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine, __in long memory, __in long cpuRate) +DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine, __in long memory, __in long cpuRate, + __in BOOL enableUccMode) { // call with null logon in order to create tasks utilizing the current logon - return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL, memory, cpuRate); + return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL, memory, cpuRate, enableUccMode); } //---------------------------------------------------------------------------- @@ -983,7 +1122,7 @@ DWORD CreateTaskAsUser(__in PCWSTR jobObjName, goto done; } - err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user, -1, -1); + err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user, -1, -1, FALSE); done: if( profileIsLoaded ) { @@ -1187,6 +1326,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) TaskCommandOption command = TaskInvalid; long memory = -1; long cpuRate = -1; + BOOL enableUccMode = FALSE; wchar_t* cmdLine = NULL; wchar_t buffer[16*1024] = L""; // 32K max command line size_t charCountBufferLeft = sizeof(buffer)/sizeof(wchar_t); @@ -1203,7 +1343,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) ARGC_COMMAND_ARGS }; - if (!ParseCommandLine(argc, argv, &command, &memory, &cpuRate)) { + if (!ParseCommandLine(argc, argv, &command, &memory, &cpuRate, &enableUccMode)) { dwErrorCode = ERROR_INVALID_COMMAND_LINE; fwprintf(stderr, L"Incorrect command line arguments.\n\n"); @@ -1215,7 +1355,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) { // Create the task jobobject // - dwErrorCode = CreateTask(argv[argc-2], argv[argc-1], memory, cpuRate); + dwErrorCode = CreateTask(argv[argc-2], argv[argc-1], memory, cpuRate, enableUccMode); if (dwErrorCode != ERROR_SUCCESS) { ReportErrorCode(L"CreateTask", dwErrorCode); @@ -1336,11 +1476,16 @@ Usage: task create [OPTOINS] [TASKNAME] [COMMAND_LINE]\n\ \n\ OPTIONS: -c [cup rate] set the cpu rate limit on the job object.\n\ -m [memory] set the memory limit on the job object.\n\ + -ucc [ucc mode] winutils to run in unmanaged container cleanup mode.\n\ +\n\ The cpu limit is an integral value of percentage * 100. The memory\n\ limit is an integral number of memory in MB. \n\ The limit will not be set if 0 or negative value is passed in as\n\ parameter(s).\n\ \n\ + When task container runs in ucc mode, i.e. unmanaged container cleanup\n\ + mode, it can cleanup the associated job object of the unmanaged container.\n\ +\n\ task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE]\n\ Creates a new task jobobject with taskname as the user provided\n\ \n\ diff --git a/hadoop-yarn-project/hadoop-yarn/bin/start-yarn-ucc.cmd b/hadoop-yarn-project/hadoop-yarn/bin/start-yarn-ucc.cmd new file mode 100644 index 00000000000..671d05c78dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/bin/start-yarn-ucc.cmd @@ -0,0 +1,79 @@ +@echo off +@rem Licensed to the Apache Software Foundation (ASF) under one or more +@rem contributor license agreements. See the NOTICE file distributed with +@rem this work for additional information regarding copyright ownership. +@rem The ASF licenses this file to You under the Apache License, Version 2.0 +@rem (the "License"); you may not use this file except in compliance with +@rem the License. You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +setlocal enabledelayedexpansion + +@rem The batch is mainly for internal usage. +@rem It is called by container executor, such as winutils or container-executor, +@rem to check whether current container is unmanaged by YARN. + +echo starting yarn unmanaged container checker daemon + +@rem Cleanup variables which are to be regenerated totally +set YARN_OPTS= +set CLASSPATH= +set JAVA_TOOL_OPTIONS= + +if not defined HADOOP_BIN_PATH ( + set HADOOP_BIN_PATH=%~dp0 +) + +if "%HADOOP_BIN_PATH:~-1%" == "\" ( + set HADOOP_BIN_PATH=%HADOOP_BIN_PATH:~0,-1% +) + +set DEFAULT_LIBEXEC_DIR=%HADOOP_BIN_PATH%\..\libexec +if not defined HADOOP_LIBEXEC_DIR ( + set HADOOP_LIBEXEC_DIR=%DEFAULT_LIBEXEC_DIR% +) + +call %HADOOP_LIBEXEC_DIR%\yarn-config.cmd %* +if "%1" == "--config" ( + shift + shift +) + +set YARN_UCC_CONTAINER_ID=%1 + +if defined YARN_UCC_HEAPSIZE ( + set YARN_HEAPSIZE=%YARN_UCC_HEAPSIZE% +) else ( + set YARN_HEAPSIZE=100 +) + +if defined YARN_UCC_ROOT_LOGGER ( + set YARN_ROOT_LOGGER=%YARN_UCC_ROOT_LOGGER% +) else ( + set YARN_ROOT_LOGGER=INFO,RFA +) + +if defined YARN_UCC_LOG_DIR ( + set YARN_LOG_DIR=%YARN_UCC_LOG_DIR% +) else ( + @rem By default, unmanaged container checker logs inside container log dir + set YARN_LOG_DIR=%LOG_DIRS% +) + +if defined YARN_UCC_LOGFILE ( + set YARN_LOGFILE=%YARN_UCC_LOGFILE% +) else ( + set YARN_LOGFILE=yarn_ucc.log +) + +cmd /c yarn ucc %YARN_UCC_CONTAINER_ID% +exit /b %ERRORLEVEL% + +endlocal diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd b/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd index 3cd57a745b0..8d9bc949f5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd @@ -149,7 +149,7 @@ if "%1" == "--loglevel" ( ) ) - set yarncommands=resourcemanager nodemanager proxyserver rmadmin version jar ^ + set yarncommands=resourcemanager nodemanager ucc proxyserver rmadmin version jar ^ application applicationattempt cluster container node queue logs daemonlog historyserver ^ timelineserver classpath for %%i in ( %yarncommands% ) do ( @@ -251,6 +251,11 @@ goto :eof ) goto :eof +:ucc + set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%\nm-config\log4j.properties + set CLASS=org.apache.hadoop.yarn.server.nodemanager.util.UnmanagedContainerChecker + goto :eof + :proxyserver set CLASS=org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer set YARN_OPTS=%YARN_OPTS% %HADOOP_PROXYSERVER_OPTS% @@ -311,6 +316,7 @@ goto :eof @echo where COMMAND is one of: @echo resourcemanager run the ResourceManager @echo nodemanager run a nodemanager on each slave + @echo ucc run a unmanaged container checker for a container @echo timelineserver run the timeline server @echo rmadmin admin tools @echo version print the version diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index da076eb410c..af968f14cdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1297,6 +1297,70 @@ private static void addDeprecatedKeys() { TIMELINE_SERVICE_PREFIX + "generic-application-history."; /** + * This flag determines whether the processes of a container launched + * by the default container executor will enable unmanaged container + * cleanup feature on Windows. + * + * Container can become (partially) unmanaged due to: + * 1. For container resource managed by YARN, such as container job object + * and disk data: + * a. NM service is disabled or removed on the node. + * b. NM is unable to start up again on the node, such as depended + * configuration, or resources cannot be ready. + * c. NM local leveldb store is corrupted, such as bad disk sectors. + * d. NM has bugs, such as wrongly mark live container as complete. + * 2. For container resource unmanaged by YARN: + * a. User breakaway processes from container job object. + * b. User creates VMs from container job object. + * c. User acquires other resource on the machine which is unmanaged by + * YARN, such as produce data outside Container folder. + * + * Currently, the unmanaged container cleanup feature on Windows, only can + * cleanup the container job object of the unmanaged container. + * Cleanup for more container resources will be supported. + * + * The current container will be considered as unmanaged when: + * 1. NM is dead: + * Failed to check whether container is managed by NM within timeout. + * 2. NM is alive but container is + * {@link org.apache.hadoop.yarn.api.records.ContainerState#COMPLETE} + * or not found: + * The container is + * {@link org.apache.hadoop.yarn.api.records.ContainerState#COMPLETE} or + * not found in the NM container list. + */ + public static final String + NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_ENABLED = + NM_PREFIX + "windows-container.unmanaged-container-cleanup.enabled"; + public static final boolean + DEFAULT_NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_ENABLED = false; + + /** + * The interval to check whether current container is unmanaged. + */ + public static final String + NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_INTERVAL_MS = + NM_PREFIX + "windows-container.unmanaged-container-cleanup.check-interval-ms"; + public static final long + DEFAULT_NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_INTERVAL_MS = 5000; + + /** + * The timeout to check whether current container is unmanaged. + * If timeout reached, current container will be considered as unmanaged. + * To avoid judging NM lost inconsistently with RM, it is suggest to be + * the same as {@link YarnConfiguration#RM_NM_EXPIRY_INTERVAL_MS}. + * However, since RM will forget the last NM heartbeat time after RM restart, + * so, in this case the timeout suggested is more reliable to meet the NM lost + * expiry configuration and the inconsistency is expected. + */ + public static final String + NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_TIMEOUT_MS = + NM_PREFIX + "windows-container.unmanaged-container-cleanup.check-timeout-ms"; + public static final long + DEFAULT_NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_TIMEOUT_MS = + DEFAULT_RM_NM_EXPIRY_INTERVAL_MS; + + /** * The setting that controls whether application history service is * enabled or not. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java index 459c1106988..fccf6f530e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java @@ -26,6 +26,10 @@ import java.util.ArrayList; import java.util.List; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.api.json.JSONUnmarshaller; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; @@ -37,6 +41,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.RMHAUtils; +import javax.xml.bind.JAXBException; +import java.io.StringReader; +import java.io.StringWriter; + @Private @Evolving public class WebAppUtils { @@ -252,6 +260,10 @@ public static String getWebAppBindURL( return webAppURLWithoutScheme; } + public static String getNMWebAppURLWithScheme(Configuration conf) { + return getHttpSchemePrefix(conf) + getNMWebAppURLWithoutScheme(conf); + } + public static String getNMWebAppURLWithoutScheme(Configuration conf) { if (YarnConfiguration.useHttps(conf)) { return conf.get(YarnConfiguration.NM_WEBAPP_HTTPS_ADDRESS, @@ -376,4 +388,22 @@ static String getPassword(Configuration conf, String alias) { } return password; } + + public static T jsonToObject( + String json, Class klass) throws JAXBException { + JSONJAXBContext jc = new JSONJAXBContext( + JSONConfiguration.mapped().rootUnwrapping(false).build(), klass); + JSONUnmarshaller jm = jc.createJSONUnmarshaller(); + return jm.unmarshalFromJSON(new StringReader(json), klass); + } + + public static String objectToJson( + Object obj, Class klass) throws JAXBException { + StringWriter sw = new StringWriter(); + JSONJAXBContext jc = new JSONJAXBContext( + JSONConfiguration.mapped().rootUnwrapping(false).build(), klass); + JSONMarshaller jm = jc.createJSONMarshaller(); + jm.marshallToJSON(obj, sw); + return sw.toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 079d4d2817d..114aaa2377c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1190,6 +1190,66 @@ + + This flag determines whether the processes of a container launched + by the default container executor will enable unmanaged container + cleanup feature on Windows. + + Container can become (partially) unmanaged due to: + 1. For container resource managed by YARN, such as container job object + and disk data: + a. NM service is disabled or removed on the node. + b. NM is unable to start up again on the node, such as depended + configuration, or resources cannot be ready. + c. NM local leveldb store is corrupted, such as bad disk sectors. + d. NM has bugs, such as wrongly mark live container as complete. + 2. For container resource unmanaged by YARN: + a. User breakaway processes from container job object. + b. User creates VMs from container job object. + c. User acquires other resource on the machine which is unmanaged by + YARN, such as produce data outside Container folder. + + Currently, the unmanaged container cleanup feature on Windows, only can + cleanup the container job object of the unmanaged container. + Cleanup for more container resources will be supported. + + The current container will be considered as unmanaged when: + 1. NM is dead: + Failed to check whether container is managed by NM within timeout. + 2. NM is alive but container is + org.apache.hadoop.yarn.api.records.ContainerState#COMPLETE + or not found: + The container is + org.apache.hadoop.yarn.api.records.ContainerState#COMPLETE or + not found in the NM container list. + + yarn.nodemanager.windows-container.unmanaged-container-cleanup.enabled + false + + + + + The interval to check whether current container is unmanaged. + + yarn.nodemanager.windows-container.unmanaged-container-cleanup.check-interval-ms + 5000 + + + + + The timeout to check whether current container is unmanaged. + If timeout reached, current container will be considered as unmanaged. + To avoid judging NM lost inconsistently with RM, it is suggest to be + the same as yarn.nm.liveness-monitor.expiry-interval-ms. + However, since RM will forget the last NM heartbeat time after RM restart, + so, in this case the timeout suggested is more reliable to meet the NM lost + expiry configuration and the inconsistency is expected. + + yarn.nodemanager.windows-container.unmanaged-container-cleanup.check-timeout-ms + 600000 + + + The local filesystem directory in which the node manager will store state when recovery is enabled. yarn.nodemanager.recovery.dir diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 377fd1d2d17..c2f2bae3de2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -330,6 +330,7 @@ protected Path getPidFilePath(ContainerId containerId) { if (Shell.WINDOWS) { int cpuRate = -1; int memory = -1; + boolean ucc = false; if (resource != null) { if (conf .getBoolean( @@ -365,9 +366,16 @@ protected Path getPidFilePath(ContainerId containerId) { cpuRate = Math.min(10000, (int) ((containerVCores * 10000) / yarnVCores)); } + + if (conf.getBoolean( + YarnConfiguration.NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_ENABLED, + YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_ENABLED)) { + ucc = true; + } } return new String[] { Shell.WINUTILS, "task", "create", "-m", - String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId, + String.valueOf(memory), "-c", String.valueOf(cpuRate), + "-ucc", ucc ? "1" : "0", groupId, "cmd /c " + command }; } else { List retCommand = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index f55e0e51394..80d13f1bb7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -346,22 +346,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, stateMachine; public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { - switch (stateMachine.getCurrentState()) { - case NEW: - case LOCALIZING: - case LOCALIZATION_FAILED: - case LOCALIZED: - case RUNNING: - case EXITED_WITH_SUCCESS: - case EXITED_WITH_FAILURE: - case KILLING: - case CONTAINER_CLEANEDUP_AFTER_KILL: - case CONTAINER_RESOURCES_CLEANINGUP: - return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING; - case DONE: - default: - return org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; - } + return ContainerState.toApiContainerState(stateMachine.getCurrentState()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index a43df892874..bec05d1e300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -21,5 +21,25 @@ public enum ContainerState { NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, - CONTAINER_RESOURCES_CLEANINGUP, DONE + CONTAINER_RESOURCES_CLEANINGUP, DONE; + + public static org.apache.hadoop.yarn.api.records.ContainerState + toApiContainerState(ContainerState state) { + switch (state) { + case NEW: + case LOCALIZING: + case LOCALIZATION_FAILED: + case LOCALIZED: + case RUNNING: + case EXITED_WITH_SUCCESS: + case EXITED_WITH_FAILURE: + case KILLING: + case CONTAINER_CLEANEDUP_AFTER_KILL: + case CONTAINER_RESOURCES_CLEANINGUP: + return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING; + case DONE: + default: + return org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/UnmanagedContainerChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/UnmanagedContainerChecker.java new file mode 100644 index 00000000000..10588f8f1b7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/UnmanagedContainerChecker.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.BasicResponseHandler; +import org.apache.http.impl.client.DefaultHttpClient; + +import javax.xml.bind.JAXBException; +import java.io.IOException; + +/** + * Called by container executor, such as winutils or container-executor, to + * check whether current container is unmanaged by YARN. + * + * Container can become (partially) unmanaged due to: + * 1. For container resource managed by YARN, such as container job object + * and disk data: + * a. NM service is disabled or removed on the node. + * b. NM is unable to start up again on the node, such as depended + * configuration, or resources cannot be ready. + * c. NM local leveldb store is corrupted, such as bad disk sectors. + * d. NM has bugs, such as wrongly mark live container as complete. + * 2. For container resource unmanaged by YARN: + * a. User breakaway processes from container job object. + * b. User creates VMs from container job object. + * c. User acquires other resource on the machine which is unmanaged by + * YARN, such as produce data outside Container folder. + * + * The current container will be considered as unmanaged when: + * 1. NM is dead: + * Failed to check whether container is managed by NM within timeout. + * 2. NM is alive but container is + * {@link org.apache.hadoop.yarn.api.records.ContainerState#COMPLETE} + * or not found: + * The container is + * {@link org.apache.hadoop.yarn.api.records.ContainerState#COMPLETE} or + * not found in the NM container list. + */ +public class UnmanagedContainerChecker { + + private enum LogLevel {INFO, WARN, ERROR} + + private static final Log LOG = LogFactory.getLog(UnmanagedContainerChecker.class); + public static final int FAILED_EXITCODE = 175; + public static final int UNMANAGED_EXITCODE = 176; + + // Usage: containerId + // Process exits if and only if the container of the corresponding containerId + // is unmanaged by local NM. + public static void main(String[] argv) { + int exitCode; + try { + monitorContainer(argv[0]); + exitCode = UNMANAGED_EXITCODE; + } catch (Throwable t) { + log(LogLevel.ERROR, "Failed to monitor container. " + + "Consider it is unmanaged.", t); + exitCode = FAILED_EXITCODE; + } + + log(LogLevel.ERROR, "Exiting with exitcode: " + exitCode); + System.exit(exitCode); + } + + // Only throws unrecoverable, non-transient error, so caller is not expected + // to retry it. + private static void monitorContainer(String containerId) { + Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + + YarnConfiguration conf = new YarnConfiguration(); + long checkIntervalMs = conf.getLong( + YarnConfiguration.NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_INTERVAL_MS); + long checkTimeoutMs = conf.getLong( + YarnConfiguration.NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_UNMANAGED_CONTAINER_CLEANUP_CHECK_TIMEOUT_MS); + + String nmContainersUri = StringHelper.pjoin( + WebAppUtils.getNMWebAppURLWithScheme(conf), + "ws/v1/node", "containers"); + + long failedToCheckStartTimeMs = -1; + while (true) { + try { + boolean isManaged = + isContainerManagedInNM(nmContainersUri, containerId); + + if (failedToCheckStartTimeMs != -1) { + log(LogLevel.INFO, + "Succeeded to check whether container is managed by NM. " + + "Check timeout canceled."); + failedToCheckStartTimeMs = -1; + } + + if (!isManaged) { + log(LogLevel.ERROR, "Container is not managed in NM."); + return; + } + } catch (Throwable t) { + long failedToCheckCurrentTimeMs = System.currentTimeMillis(); + log(LogLevel.WARN, + "Failed to check whether container is managed by NM " + + "from time " + failedToCheckStartTimeMs + "ms " + + "to time " + failedToCheckCurrentTimeMs + "ms. " + + "Check timeout is " + checkTimeoutMs + "ms.", t); + if (failedToCheckStartTimeMs == -1) { + failedToCheckStartTimeMs = failedToCheckCurrentTimeMs; + } else { + if (failedToCheckCurrentTimeMs - failedToCheckStartTimeMs >= + checkTimeoutMs) { + log(LogLevel.ERROR, + "Failed to check whether container is managed by NM " + + "within timeout. Consider it is unmanaged."); + return; + } + } + } + + try { + Thread.sleep(checkIntervalMs); + } catch (Throwable t) { + log(LogLevel.WARN, "Failed to sleep, ignore it.", t); + } + } + } + + private static void log( + LogLevel level, Object message) { + log(level, message, null); + } + + private static void log( + LogLevel level, Object message, Throwable t) { + String currentTimeMs = "[" + System.currentTimeMillis() + "ms]: "; + String printMessage = currentTimeMs + message; + + switch (level) { + case INFO: + // Logs into console for diagnostics + System.out.println(printMessage); + // Logs into file for reliable debugging + LOG.info(message, t); + break; + case WARN: + System.err.println(printMessage); + LOG.warn(message, t); + break; + case ERROR: + System.err.println(printMessage); + LOG.error(message, t); + break; + } + } + + private static boolean isContainerManagedInNM( + String nmContainersUri, String containerId) throws IOException, JAXBException { + ContainersInfo containersInfo = getContainersInfoFromNM(nmContainersUri); + for (ContainerInfo containerInfo : containersInfo.getContainers()) { + if (containerInfo.getId().equals(containerId)) { + return isContainerStateManagedInNM(containerInfo.getState()); + } + } + return false; + } + + private static boolean isContainerStateManagedInNM(String containerStateStr) { + ContainerState containerState; + try { + containerState = ContainerState.valueOf(containerStateStr); + } catch (Exception e) { + log(LogLevel.WARN, "Failed to parse ContainerState from string to enum: " + + containerStateStr, e); + return false; + } + + // Only considers API Container COMPLETE state as unmanaged, because: + // 1. Other NM Container States may allow running container process to + // exist, such as NEW, LOCALIZING, LOCALIZED. + // 2. The COMPLETE state is from API which means it is to be reported to the + // application, so we can safely consider the container is unmanaged. + if (ContainerState.toApiContainerState(containerState) == + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE) { + return false; + } + + return true; + } + + // Since NM starts its WebServer only if all containers managed by it are + // recovered, so the returned ContainersInfo is ensured to be complete. + private static ContainersInfo getContainersInfoFromNM( + String nmContainersUri) throws IOException, JAXBException { + DefaultHttpClient client = new DefaultHttpClient(); + HttpGet httpGet = new HttpGet(nmContainersUri); + httpGet.setHeader("Accept", "application/json"); + + HttpResponse response = client.execute(httpGet); + + int statusCode = response.getStatusLine().getStatusCode(); + if (!(statusCode >= 200 && statusCode <= 299)) { + throw new IOException( + "Http request failed with status code: " + statusCode); + } + + String jsonContent = new BasicResponseHandler().handleResponse(response); + return WebAppUtils.jsonToObject(jsonContent, ContainersInfo.class); + } +}