diff --git hadoop-common-project/hadoop-common/src/main/winutils/chown.c hadoop-common-project/hadoop-common/src/main/winutils/chown.c index bc2aefc..1be8121 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/chown.c +++ hadoop-common-project/hadoop-common/src/main/winutils/chown.c @@ -63,11 +63,11 @@ static DWORD ChangeFileOwnerBySid(__in LPCWSTR path, // SID is not contained in the caller's token, and have the SE_GROUP_OWNER // permission enabled. // - if (!EnablePrivilege(L"SeTakeOwnershipPrivilege")) + if (EnablePrivilege(L"SeTakeOwnershipPrivilege") != ERROR_SUCCESS) { fwprintf(stdout, L"INFO: The user does not have SeTakeOwnershipPrivilege.\n"); } - if (!EnablePrivilege(L"SeRestorePrivilege")) + if (EnablePrivilege(L"SeRestorePrivilege") != ERROR_SUCCESS) { fwprintf(stdout, L"INFO: The user does not have SeRestorePrivilege.\n"); } diff --git hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h index 1c0007a..bae754c 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h +++ hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h @@ -27,6 +27,8 @@ #include #include #include +#include +#include enum EXIT_CODE { @@ -153,6 +155,26 @@ DWORD ChangeFileModeByMask(__in LPCWSTR path, INT mode); DWORD GetLocalGroupsForUser(__in LPCWSTR user, __out LPLOCALGROUP_USERS_INFO_0 *groups, __out LPDWORD entries); -BOOL EnablePrivilege(__in LPCWSTR privilegeName); - void GetLibraryName(__in LPCVOID lpAddress, __out LPWSTR *filename); + +DWORD EnablePrivilege(__in LPCWSTR privilegeName); + +void AssignLsaString(__inout LSA_STRING * target, __in const char *strBuf); + +DWORD RegisterWithLsa(__in const char *logonProcessName, __out HANDLE * lsaHandle); + +void UnregisterWithLsa(__in HANDLE lsaHandle); + +DWORD LookupKerberosAuthenticationPackageId(__in HANDLE lsaHandle, __out ULONG * packageId); + +DWORD CreateLogonForUser(__in HANDLE lsaHandle, + __in const char * tokenSourceName, + __in const char * tokenOriginName, + __in ULONG authnPkgId, + __in const wchar_t* principalName, + __out HANDLE *tokenHandle); + +DWORD LoadUserProfileForLogon(__in HANDLE logonHandle, __out PROFILEINFO * pi); + +DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi); + diff --git hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c index 391247f..da16ff5 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c +++ hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c @@ -17,6 +17,8 @@ #pragma comment(lib, "authz.lib") #pragma comment(lib, "netapi32.lib") +#pragma comment(lib, "Secur32.lib") +#pragma comment(lib, "Userenv.lib") #include "winutils.h" #include #include @@ -797,7 +799,6 @@ DWORD FindFileOwnerAndPermission( __out_opt PINT pMask) { DWORD dwRtnCode = 0; - PSECURITY_DESCRIPTOR pSd = NULL; PSID psidOwner = NULL; @@ -1638,11 +1639,12 @@ GetLocalGroupsForUserEnd: // to the process's access token. // // Returns: -// TRUE: on success +// ERROR_SUCCESS on success +// GetLastError() on error // // Notes: // -BOOL EnablePrivilege(__in LPCWSTR privilegeName) +DWORD EnablePrivilege(__in LPCWSTR privilegeName) { HANDLE hToken = INVALID_HANDLE_VALUE; TOKEN_PRIVILEGES tp = { 0 }; @@ -1651,28 +1653,31 @@ BOOL EnablePrivilege(__in LPCWSTR privilegeName) if (!OpenProcessToken(GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, &hToken)) { - ReportErrorCode(L"OpenProcessToken", GetLastError()); - return FALSE; + dwErrCode = GetLastError(); + ReportErrorCode(L"OpenProcessToken", dwErrCode); + return dwErrCode; } tp.PrivilegeCount = 1; if (!LookupPrivilegeValueW(NULL, privilegeName, &(tp.Privileges[0].Luid))) { - ReportErrorCode(L"LookupPrivilegeValue", GetLastError()); + dwErrCode = GetLastError(); + ReportErrorCode(L"LookupPrivilegeValue", dwErrCode); CloseHandle(hToken); - return FALSE; + return dwErrCode; } tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED; // As stated on MSDN, we need to use GetLastError() to check if // AdjustTokenPrivileges() adjusted all of the specified privileges. // - AdjustTokenPrivileges(hToken, FALSE, &tp, 0, NULL, NULL); + if( !AdjustTokenPrivileges(hToken, FALSE, &tp, 0, NULL, NULL) ) { dwErrCode = GetLastError(); + } CloseHandle(hToken); - return dwErrCode == ERROR_SUCCESS; + return dwErrCode; } //---------------------------------------------------------------------------- @@ -1716,9 +1721,6 @@ void ReportErrorCode(LPCWSTR func, DWORD err) // Description: // Given an address, get the file name of the library from which it was loaded. // -// Returns: -// None -// // Notes: // - The function allocates heap memory and points the filename out parameter to // the newly allocated memory, which will contain the name of the file. @@ -1757,3 +1759,290 @@ cleanup: *filename = NULL; } } + +// Function: AssignLsaString +// +// Description: +// fills in values of LSA_STRING struct to point to a string buffer +// +// Returns: +// None +// +// IMPORTANT*** strBuf is not copied. It must be globally immutable +// +void AssignLsaString(__inout LSA_STRING * target, __in const char *strBuf) +{ + target->Length = (USHORT)(sizeof(char)*strlen(strBuf)); + target->MaximumLength = target->Length; + target->Buffer = (char *)(strBuf); +} + +//---------------------------------------------------------------------------- +// Function: RegisterWithLsa +// +// Description: +// Registers with local security authority and sets handle for use in later LSA +// operations +// +// Returns: +// ERROR_SUCCESS on success +// Other error code on failure +// +// Notes: +// +DWORD RegisterWithLsa(__in const char *logonProcessName, __out HANDLE * lsaHandle) +{ + LSA_STRING processName; + LSA_OPERATIONAL_MODE o_mode; // never useful as per msdn docs + NTSTATUS registerStatus; + *lsaHandle = 0; + + AssignLsaString(&processName, logonProcessName); + registerStatus = LsaRegisterLogonProcess(&processName, lsaHandle, &o_mode); + + return LsaNtStatusToWinError( registerStatus ); +} + +//---------------------------------------------------------------------------- +// Function: UnregisterWithLsa +// +// Description: +// Closes LSA handle allocated by RegisterWithLsa() +// +// Returns: +// None +// +// Notes: +// +void UnregisterWithLsa(__in HANDLE lsaHandle) +{ + LsaClose(lsaHandle); +} + +//---------------------------------------------------------------------------- +// Function: LookupKerberosAuthenticationPackageId +// +// Description: +// Looks of the current id (integer index) of the Kerberos authentication package on the local +// machine. +// +// Returns: +// ERROR_SUCCESS on success +// Other error code on failure +// +// Notes: +// +DWORD LookupKerberosAuthenticationPackageId(__in HANDLE lsaHandle, __out ULONG * packageId) +{ + NTSTATUS lookupStatus; + LSA_STRING pkgName; + + AssignLsaString(&pkgName, MICROSOFT_KERBEROS_NAME_A); + lookupStatus = LsaLookupAuthenticationPackage(lsaHandle, &pkgName, packageId); + return LsaNtStatusToWinError( lookupStatus ); +} + +//---------------------------------------------------------------------------- +// Function: CreateLogonForUser +// +// Description: +// Contacts the local LSA and performs a logon without credential for the +// given principal. This logon token will be local machine only and have no +// network credentials attached. +// +// Returns: +// ERROR_SUCCESS on success +// Other error code on failure +// +// Notes: +// This call assumes that all required privileges have already been enabled (TCB etc). +// IMPORTANT **** tokenOriginName must be immutable! +// +DWORD CreateLogonForUser(__in HANDLE lsaHandle, + __in const char * tokenSourceName, + __in const char * tokenOriginName, // must be immutable, will not be copied! + __in ULONG authnPkgId, + __in const wchar_t* principalName, + __out HANDLE *tokenHandle) +{ + DWORD logonStatus = ERROR_ASSERTION_FAILURE; // Failure to set status should trigger error + TOKEN_SOURCE tokenSource; + LSA_STRING originName; + void * profile = NULL; + + // from MSDN: + // The ClientUpn and ClientRealm members of the KERB_S4U_LOGON + // structure must point to buffers in memory that are contiguous + // to the structure itself. The value of the + // AuthenticationInformationLength parameter must take into + // account the length of these buffers. + const int principalNameBufLen = lstrlen(principalName)*sizeof(*principalName); + const int totalAuthInfoLen = sizeof(KERB_S4U_LOGON) + principalNameBufLen; + KERB_S4U_LOGON* s4uLogonAuthInfo = (KERB_S4U_LOGON*)calloc(totalAuthInfoLen, 1); + if (s4uLogonAuthInfo == NULL ) { + logonStatus = ERROR_NOT_ENOUGH_MEMORY; + goto done; + } + s4uLogonAuthInfo->MessageType = KerbS4ULogon; + s4uLogonAuthInfo->ClientUpn.Buffer = (wchar_t*)((char*)s4uLogonAuthInfo + sizeof *s4uLogonAuthInfo); + CopyMemory(s4uLogonAuthInfo->ClientUpn.Buffer, principalName, principalNameBufLen); + s4uLogonAuthInfo->ClientUpn.Length = (USHORT)principalNameBufLen; + s4uLogonAuthInfo->ClientUpn.MaximumLength = (USHORT)principalNameBufLen; + + AllocateLocallyUniqueId(&tokenSource.SourceIdentifier); + StringCchCopyA(tokenSource.SourceName, TOKEN_SOURCE_LENGTH, tokenSourceName ); + AssignLsaString(&originName, tokenOriginName); + + { + DWORD cbProfile = 0; + LUID logonId; + QUOTA_LIMITS quotaLimits; + NTSTATUS subStatus; + + NTSTATUS logonNtStatus = LsaLogonUser(lsaHandle, + &originName, + Batch, // SECURITY_LOGON_TYPE + authnPkgId, + s4uLogonAuthInfo, + totalAuthInfoLen, + 0, + &tokenSource, + &profile, + &cbProfile, + &logonId, + tokenHandle, + "aLimits, + &subStatus); + logonStatus = LsaNtStatusToWinError( logonNtStatus ); + } +done: + // clean up + if (s4uLogonAuthInfo != NULL) { + free(s4uLogonAuthInfo); + } + if (profile != NULL) { + LsaFreeReturnBuffer(profile); + } + return logonStatus; +} + +// NOTE: must free allocatedName +DWORD GetNameFromLogonToken(__in HANDLE logonToken, __out wchar_t **allocatedName) +{ + DWORD userInfoSize = 0; + PTOKEN_USER user = NULL; + DWORD userNameSize = 0; + wchar_t * userName = NULL; + DWORD domainNameSize = 0; + wchar_t * domainName = NULL; + SID_NAME_USE sidUse = SidTypeUnknown; + DWORD getNameStatus = ERROR_ASSERTION_FAILURE; // Failure to set status should trigger error + BOOL tokenInformation = FALSE; + + // call for sid size then alloc and call for sid + tokenInformation = GetTokenInformation(logonToken, TokenUser, NULL, 0, &userInfoSize); + assert (FALSE == tokenInformation); + + // last call should have failed and filled in allocation size + if ((getNameStatus = GetLastError()) != ERROR_INSUFFICIENT_BUFFER) + { + goto done; + } + user = (PTOKEN_USER)calloc(userInfoSize,1); + if (user == NULL) + { + getNameStatus = ERROR_NOT_ENOUGH_MEMORY; + goto done; + } + if (!GetTokenInformation(logonToken, TokenUser, user, userInfoSize, &userInfoSize)) { + getNameStatus = GetLastError(); + goto done; + } + LookupAccountSid( NULL, user->User.Sid, NULL, &userNameSize, NULL, &domainNameSize, &sidUse ); + // last call should have failed and filled in allocation size + if ((getNameStatus = GetLastError()) != ERROR_INSUFFICIENT_BUFFER) + { + goto done; + } + userName = (wchar_t *)calloc(userNameSize, sizeof(wchar_t)); + if (userName == NULL) { + getNameStatus = ERROR_NOT_ENOUGH_MEMORY; + goto done; + } + domainName = (wchar_t *)calloc(domainNameSize, sizeof(wchar_t)); + if (domainName == NULL) { + getNameStatus = ERROR_NOT_ENOUGH_MEMORY; + goto done; + } + if (!LookupAccountSid( NULL, user->User.Sid, userName, &userNameSize, domainName, &domainNameSize, &sidUse )) { + getNameStatus = GetLastError(); + goto done; + } + + getNameStatus = ERROR_SUCCESS; + *allocatedName = userName; + userName = NULL; +done: + if (user != NULL) { + free( user ); + user = NULL; + } + if (userName != NULL) { + free( userName ); + userName = NULL; + } + if (domainName != NULL) { + free( domainName ); + domainName = NULL; + } + return getNameStatus; +} + +DWORD LoadUserProfileForLogon(__in HANDLE logonHandle, __out PROFILEINFO * pi) +{ + wchar_t *userName = NULL; + DWORD loadProfileStatus = ERROR_ASSERTION_FAILURE; // Failure to set status should trigger error + + loadProfileStatus = GetNameFromLogonToken( logonHandle, &userName ); + if (loadProfileStatus != ERROR_SUCCESS) { + goto done; + } + + assert(pi); + + ZeroMemory( pi, sizeof(*pi) ); + pi->dwSize = sizeof(*pi); + pi->lpUserName = userName; + pi->dwFlags = PI_NOUI; + + // if the profile does not exist it will be created + if ( !LoadUserProfile( logonHandle, pi ) ) { + loadProfileStatus = GetLastError(); + goto done; + } + + loadProfileStatus = ERROR_SUCCESS; +done: + return loadProfileStatus; +} + +DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi) +{ + DWORD touchProfileStatus = ERROR_ASSERTION_FAILURE; // Failure to set status should trigger error + + assert(pi); + + if ( !UnloadUserProfile(logonHandle, pi->hProfile ) ) { + touchProfileStatus = GetLastError(); + goto done; + } + if (pi->lpUserName != NULL) { + free(pi->lpUserName); + pi->lpUserName = NULL; + } + ZeroMemory( pi, sizeof(*pi) ); + + touchProfileStatus = ERROR_SUCCESS; +done: + return touchProfileStatus; +} diff --git hadoop-common-project/hadoop-common/src/main/winutils/symlink.c hadoop-common-project/hadoop-common/src/main/winutils/symlink.c index ea372cc..02acd4d 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/symlink.c +++ hadoop-common-project/hadoop-common/src/main/winutils/symlink.c @@ -77,7 +77,7 @@ int Symlink(__in int argc, __in_ecount(argc) wchar_t *argv[]) // This is just an additional step to do the privilege check by not using // error code from CreateSymbolicLink() method. // - if (!EnablePrivilege(L"SeCreateSymbolicLinkPrivilege")) + if (EnablePrivilege(L"SeCreateSymbolicLinkPrivilege") != ERROR_SUCCESS) { fwprintf(stderr, L"No privilege to create symbolic links.\n"); diff --git hadoop-common-project/hadoop-common/src/main/winutils/task.c hadoop-common-project/hadoop-common/src/main/winutils/task.c index 19bda96..783f162 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/task.c +++ hadoop-common-project/hadoop-common/src/main/winutils/task.c @@ -18,6 +18,7 @@ #include "winutils.h" #include #include +#include #define PSAPI_VERSION 1 #pragma comment(lib, "psapi.lib") @@ -28,12 +29,18 @@ // process exits with 128 + signal. For SIGKILL, this would be 128 + 9 = 137. #define KILLED_PROCESS_EXIT_CODE 137 +// Name for tracking this logon process when registering with LSA +static const char *LOGON_PROCESS_NAME="Hadoop Container Executor"; +// Name for token source, must be less or eq to TOKEN_SOURCE_LENGTH (currently 8) chars +static const char *TOKEN_SOURCE_NAME = "HadoopEx"; + // List of different task related command line options supported by // winutils. typedef enum TaskCommandOptionType { TaskInvalid, TaskCreate, + TaskCreateAsUser, TaskIsAlive, TaskKill, TaskProcessList @@ -86,37 +93,53 @@ static BOOL ParseCommandLine(__in int argc, } } + if (argc >= 6) { + if (wcscmp(argv[1], L"createAsUser") == 0) + { + *command = TaskCreateAsUser; + return TRUE; + } + } + return FALSE; } //---------------------------------------------------------------------------- -// Function: createTask +// Function: CreateTaskImpl // // Description: // Creates a task via a jobobject. Outputs the // appropriate information to stdout on success, or stderr on failure. +// logonHandle may be NULL, in this case the current logon will be utilized for the +// created process // // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) +DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PWSTR cmdLine) { - DWORD err = ERROR_SUCCESS; + DWORD dwErrorCode = ERROR_SUCCESS; DWORD exitCode = EXIT_FAILURE; + DWORD currDirCnt = 0; STARTUPINFO si; PROCESS_INFORMATION pi; HANDLE jobObject = NULL; JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = { 0 }; + void * envBlock = NULL; + BOOL createProcessResult = FALSE; + + wchar_t* curr_dir = NULL; + FILE *stream = NULL; // Create un-inheritable job object handle and set job object to terminate // when last handle is closed. So winutils.exe invocation has the only open // job object handle. Exit of winutils.exe ensures termination of job object. // Either a clean exit of winutils or crash or external termination. jobObject = CreateJobObject(NULL, jobObjName); - err = GetLastError(); - if(jobObject == NULL || err == ERROR_ALREADY_EXISTS) + dwErrorCode = GetLastError(); + if(jobObject == NULL || dwErrorCode == ERROR_ALREADY_EXISTS) { - return err; + return dwErrorCode; } jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; if(SetInformationJobObject(jobObject, @@ -124,36 +147,102 @@ DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) &jeli, sizeof(jeli)) == 0) { - err = GetLastError(); + dwErrorCode = GetLastError(); CloseHandle(jobObject); - return err; + return dwErrorCode; } if(AssignProcessToJobObject(jobObject, GetCurrentProcess()) == 0) { - err = GetLastError(); + dwErrorCode = GetLastError(); CloseHandle(jobObject); - return err; + return dwErrorCode; } // the child JVM uses this env var to send the task OS process identifier // to the TaskTracker. We pass the job object name. if(SetEnvironmentVariable(L"JVM_PID", jobObjName) == 0) { - err = GetLastError(); - CloseHandle(jobObject); - return err; + dwErrorCode = GetLastError(); + // We have to explictly Terminate, passing in the error code + // simply closing the job would kill our own process with success exit status + TerminateJobObject(jobObject, dwErrorCode); + return dwErrorCode; } ZeroMemory( &si, sizeof(si) ); si.cb = sizeof(si); ZeroMemory( &pi, sizeof(pi) ); - if (CreateProcess(NULL, cmdLine, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi) == 0) - { - err = GetLastError(); - CloseHandle(jobObject); - return err; + if( logonHandle != NULL ) { + // create user environment for this logon + if(!CreateEnvironmentBlock(&envBlock, + logonHandle, + TRUE )) { + dwErrorCode = GetLastError(); + // We have to explictly Terminate, passing in the error code + // simply closing the job would kill our own process with success exit status + TerminateJobObject(jobObject, dwErrorCode); + return dwErrorCode; + } + } + + // Get the required buffer size first + currDirCnt = GetCurrentDirectory(0, NULL); + if (0 < currDirCnt) { + curr_dir = (wchar_t*) alloca(currDirCnt * sizeof(wchar_t)); + assert(curr_dir); + currDirCnt = GetCurrentDirectory(currDirCnt, curr_dir); + } + + if (0 == currDirCnt) { + dwErrorCode = GetLastError(); + // We have to explictly Terminate, passing in the error code + // simply closing the job would kill our own process with success exit status + TerminateJobObject(jobObject, dwErrorCode); + return dwErrorCode; + } + + if (logonHandle == NULL) { + createProcessResult = CreateProcess( + NULL, // ApplicationName + cmdLine, // command line + NULL, // process security attributes + NULL, // thread security attributes + TRUE, // inherit handles + 0, // creation flags + NULL, // environment + curr_dir, // current directory + &si, // startup info + &pi); // process info + } + else { + createProcessResult = CreateProcessAsUser( + logonHandle, // logon token handle + NULL, // Application handle + cmdLine, // command line + NULL, // process security attributes + NULL, // thread security attributes + FALSE, // inherit handles + CREATE_UNICODE_ENVIRONMENT, // creation flags + envBlock, // environment + curr_dir, // current directory + &si, // startup info + &pi); // process info + } + + if (FALSE == createProcessResult) { + dwErrorCode = GetLastError(); + if( envBlock != NULL ) { + DestroyEnvironmentBlock( envBlock ); + envBlock = NULL; + } + // We have to explictly Terminate, passing in the error code + // simply closing the job would kill our own process with success exit status + TerminateJobObject(jobObject, dwErrorCode); + + // This is tehnically dead code, we cannot reach this condition + return dwErrorCode; } CloseHandle(pi.hThread); @@ -162,10 +251,15 @@ DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) WaitForSingleObject( pi.hProcess, INFINITE ); if(GetExitCodeProcess(pi.hProcess, &exitCode) == 0) { - err = GetLastError(); + dwErrorCode = GetLastError(); } CloseHandle( pi.hProcess ); + if( envBlock != NULL ) { + DestroyEnvironmentBlock( envBlock ); + envBlock = NULL; + } + // Terminate job object so that all spawned processes are also killed. // This is needed because once this process closes the handle to the job // object and none of the spawned objects have the handle open (via @@ -173,21 +267,134 @@ DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) // program (say winutils task kill) to terminate this job object via its name. if(TerminateJobObject(jobObject, exitCode) == 0) { - err = GetLastError(); + dwErrorCode = GetLastError(); } - // comes here only on failure or TerminateJobObject + // comes here only on failure of TerminateJobObject CloseHandle(jobObject); - if(err != ERROR_SUCCESS) + if(dwErrorCode != ERROR_SUCCESS) { - return err; + return dwErrorCode; } return exitCode; } //---------------------------------------------------------------------------- -// Function: isTaskAlive +// Function: CreateTask +// +// Description: +// Creates a task via a jobobject. Outputs the +// appropriate information to stdout on success, or stderr on failure. +// +// Returns: +// ERROR_SUCCESS: On success +// GetLastError: otherwise +DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) +{ + // call with null logon in order to create tasks utilizing the current logon + return CreateTaskImpl( NULL, jobObjName, cmdLine ); +} +//---------------------------------------------------------------------------- +// Function: CreateTask +// +// Description: +// Creates a task via a jobobject. Outputs the +// appropriate information to stdout on success, or stderr on failure. +// +// Returns: +// ERROR_SUCCESS: On success +// GetLastError: otherwise +DWORD CreateTaskAsUser(__in PCWSTR jobObjName,__in PWSTR user, __in PWSTR pidFilePath, __in PWSTR cmdLine) +{ + DWORD err = ERROR_SUCCESS; + DWORD exitCode = EXIT_FAILURE; + ULONG authnPkgId; + HANDLE lsaHandle = INVALID_HANDLE_VALUE; + PROFILEINFO pi; + BOOL profileIsLoaded = FALSE; + FILE* pidFile = NULL; + + DWORD retLen = 0; + HANDLE logonHandle = NULL; + + err = EnablePrivilege(SE_TCB_NAME); + if( err != ERROR_SUCCESS ) { + fwprintf(stdout, L"INFO: The user does not have SE_TCB_NAME.\n"); + goto done; + } + err = EnablePrivilege(SE_ASSIGNPRIMARYTOKEN_NAME); + if( err != ERROR_SUCCESS ) { + fwprintf(stdout, L"INFO: The user does not have SE_ASSIGNPRIMARYTOKEN_NAME.\n"); + goto done; + } + err = EnablePrivilege(SE_INCREASE_QUOTA_NAME); + if( err != ERROR_SUCCESS ) { + fwprintf(stdout, L"INFO: The user does not have SE_INCREASE_QUOTA_NAME.\n"); + goto done; + } + err = EnablePrivilege(SE_RESTORE_NAME); + if( err != ERROR_SUCCESS ) { + fwprintf(stdout, L"INFO: The user does not have SE_RESTORE_NAME.\n"); + goto done; + } + + err = RegisterWithLsa(LOGON_PROCESS_NAME ,&lsaHandle); + if( err != ERROR_SUCCESS ) goto done; + + err = LookupKerberosAuthenticationPackageId( lsaHandle, &authnPkgId ); + if( err != ERROR_SUCCESS ) goto done; + + err = CreateLogonForUser(lsaHandle, + LOGON_PROCESS_NAME, + TOKEN_SOURCE_NAME, + authnPkgId, + user, + &logonHandle); + if( err != ERROR_SUCCESS ) goto done; + + err = LoadUserProfileForLogon(logonHandle, &pi); + if( err != ERROR_SUCCESS ) goto done; + profileIsLoaded = TRUE; + + // Create the PID file + + if (!(pidFile = _wfopen(pidFilePath, "w"))) { + err = GetLastError(); + goto done; + } + + if (0 > fprintf_s(pidFile, "%ls", jobObjName)) { + err = GetLastError(); + } + + fclose(pidFile); + + if (err != ERROR_SUCCESS) { + goto done; + } + + err = CreateTaskImpl(logonHandle, jobObjName, cmdLine); + +done: + if( profileIsLoaded ) { + UnloadProfileForLogon( logonHandle, &pi ); + profileIsLoaded = FALSE; + } + if( logonHandle != NULL ) { + CloseHandle(logonHandle); + } + + if (INVALID_HANDLE_VALUE != lsaHandle) { + UnregisterWithLsa(lsaHandle); + } + + return err; +} + + +//---------------------------------------------------------------------------- +// Function: IsTaskAlive // // Description: // Checks if a task is alive via a jobobject. Outputs the @@ -196,7 +403,7 @@ DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD isTaskAlive(const WCHAR* jobObjName, int* isAlive, int* procsInJob) +DWORD IsTaskAlive(const WCHAR* jobObjName, int* isAlive, int* procsInJob) { PJOBOBJECT_BASIC_PROCESS_ID_LIST procList; HANDLE jobObject = NULL; @@ -247,7 +454,7 @@ DWORD isTaskAlive(const WCHAR* jobObjName, int* isAlive, int* procsInJob) } //---------------------------------------------------------------------------- -// Function: killTask +// Function: KillTask // // Description: // Kills a task via a jobobject. Outputs the @@ -256,7 +463,7 @@ DWORD isTaskAlive(const WCHAR* jobObjName, int* isAlive, int* procsInJob) // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD killTask(PCWSTR jobObjName) +DWORD KillTask(PCWSTR jobObjName) { HANDLE jobObject = OpenJobObject(JOB_OBJECT_TERMINATE, FALSE, jobObjName); if(jobObject == NULL) @@ -280,7 +487,7 @@ DWORD killTask(PCWSTR jobObjName) } //---------------------------------------------------------------------------- -// Function: printTaskProcessList +// Function: PrintTaskProcessList // // Description: // Prints resource usage of all processes in the task jobobject @@ -288,7 +495,7 @@ DWORD killTask(PCWSTR jobObjName) // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD printTaskProcessList(const WCHAR* jobObjName) +DWORD PrintTaskProcessList(const WCHAR* jobObjName) { DWORD i; PJOBOBJECT_BASIC_PROCESS_ID_LIST procList; @@ -372,6 +579,21 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) { DWORD dwErrorCode = ERROR_SUCCESS; TaskCommandOption command = TaskInvalid; + wchar_t* cmdLine = NULL; + wchar_t buffer[16*1024] = L""; // 32K max command line + size_t charCountBufferLeft = sizeof (buffer)/sizeof(wchar_t); + int crtArgIndex = 0; + size_t argLen = 0; + size_t wscatErr = 0; + wchar_t* insertHere = NULL; + + enum { + ARGC_JOBOBJECTNAME = 2, + ARGC_USERNAME, + ARGC_PIDFILE, + ARGC_COMMAND, + ARGC_COMMAND_ARGS + }; if (!ParseCommandLine(argc, argv, &command)) { dwErrorCode = ERROR_INVALID_COMMAND_LINE; @@ -385,10 +607,57 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) { // Create the task jobobject // - dwErrorCode = createTask(argv[2], argv[3]); + dwErrorCode = CreateTask(argv[2], argv[3]); + if (dwErrorCode != ERROR_SUCCESS) + { + ReportErrorCode(L"CreateTask", dwErrorCode); + goto TaskExit; + } + } else if (command == TaskCreateAsUser) + { + // Create the task jobobject as a domain user + // createAsUser accepts an open list of arguments. All arguments after the command are + // to be passed as argumrnts to the command itself.Here we're concatenating all + // arguments after the command into a single arg entry. + // + cmdLine = argv[ARGC_COMMAND]; + if (argc > ARGC_COMMAND_ARGS) { + crtArgIndex = ARGC_COMMAND; + insertHere = buffer; + while (crtArgIndex < argc) { + argLen = wcslen(argv[crtArgIndex]); + wscatErr = wcscat_s(insertHere, charCountBufferLeft, argv[crtArgIndex]); + switch (wscatErr) { + case 0: + // 0 means success; + break; + case EINVAL: + dwErrorCode = ERROR_INVALID_PARAMETER; + goto TaskExit; + case ERANGE: + dwErrorCode = ERROR_INSUFFICIENT_BUFFER; + goto TaskExit; + default: + // This case is not MSDN documented. + dwErrorCode = ERROR_GEN_FAILURE; + goto TaskExit; + } + insertHere += argLen; + charCountBufferLeft -= argLen; + insertHere[0] = L' '; + insertHere += 1; + charCountBufferLeft -= 1; + insertHere[0] = 0; + ++crtArgIndex; + } + cmdLine = buffer; + } + + dwErrorCode = CreateTaskAsUser( + argv[ARGC_JOBOBJECTNAME], argv[ARGC_USERNAME], argv[ARGC_PIDFILE], cmdLine); if (dwErrorCode != ERROR_SUCCESS) { - ReportErrorCode(L"createTask", dwErrorCode); + ReportErrorCode(L"CreateTaskAsUser", dwErrorCode); goto TaskExit; } } else if (command == TaskIsAlive) @@ -397,10 +666,10 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) // int isAlive; int numProcs; - dwErrorCode = isTaskAlive(argv[2], &isAlive, &numProcs); + dwErrorCode = IsTaskAlive(argv[2], &isAlive, &numProcs); if (dwErrorCode != ERROR_SUCCESS) { - ReportErrorCode(L"isTaskAlive", dwErrorCode); + ReportErrorCode(L"IsTaskAlive", dwErrorCode); goto TaskExit; } @@ -412,27 +681,27 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) else { dwErrorCode = ERROR_TASK_NOT_ALIVE; - ReportErrorCode(L"isTaskAlive returned false", dwErrorCode); + ReportErrorCode(L"IsTaskAlive returned false", dwErrorCode); goto TaskExit; } } else if (command == TaskKill) { // Check if task jobobject // - dwErrorCode = killTask(argv[2]); + dwErrorCode = KillTask(argv[2]); if (dwErrorCode != ERROR_SUCCESS) { - ReportErrorCode(L"killTask", dwErrorCode); + ReportErrorCode(L"KillTask", dwErrorCode); goto TaskExit; } } else if (command == TaskProcessList) { // Check if task jobobject // - dwErrorCode = printTaskProcessList(argv[2]); + dwErrorCode = PrintTaskProcessList(argv[2]); if (dwErrorCode != ERROR_SUCCESS) { - ReportErrorCode(L"printTaskProcessList", dwErrorCode); + ReportErrorCode(L"PrintTaskProcessList", dwErrorCode); goto TaskExit; } } else @@ -453,10 +722,12 @@ void TaskUsage() // ProcessTree.isSetsidSupported() fwprintf(stdout, L"\ Usage: task create [TASKNAME] [COMMAND_LINE] |\n\ + task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE] |\n\ task isAlive [TASKNAME] |\n\ task kill [TASKNAME]\n\ task processList [TASKNAME]\n\ Creates a new task jobobject with taskname\n\ + Creates a new task jobobject with taskname as the user provided\n\ Checks if task jobobject is alive\n\ Kills task jobobject\n\ Prints to stdout a list of processes in the task\n\ diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java index 588b217..953039d 100644 --- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java @@ -20,10 +20,12 @@ import static org.junit.Assert.*; import static org.junit.Assume.assumeTrue; +import static org.junit.matchers.JUnitMatchers.containsString; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FileWriter; import java.io.IOException; import org.apache.commons.io.FileUtils; @@ -33,7 +35,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assume.*; + import static org.hamcrest.CoreMatchers.*; /** @@ -521,4 +523,26 @@ public void testReadLink() throws IOException { assertThat(ece.getExitCode(), is(1)); } } + + @SuppressWarnings("deprecation") + @Test(timeout=10000) + public void testTaskCreate() throws IOException { + File batch = new File(TEST_DIR, "testTaskCreate.cmd"); + File proof = new File(TEST_DIR, "testTaskCreate.out"); + FileWriter fw = new FileWriter(batch); + String testNumber = String.format("%f", Math.random()); + fw.write(String.format("echo %s > \"%s\"", testNumber, proof.getAbsolutePath())); + fw.close(); + + assertFalse(proof.exists()); + + Shell.execCommand(Shell.WINUTILS, "task", "create", "testTaskCreate" + testNumber, + batch.getAbsolutePath()); + + assertTrue(proof.exists()); + + String outNumber = FileUtils.readFileToString(proof); + + assertThat(outNumber, containsString(testNumber)); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 034ec4f..b1792ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -894,6 +894,12 @@ public static final long DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT = 1000; + + /** + /* The Windows group that the windows-secure-container-executor should run as. + */ + public static final String NM_WINDOWS_SECURE_CONTAINER_GROUP = + NM_PREFIX + "windows-secure-container-executor.group"; /** T-file compression types used to compress aggregated logs.*/ public static final String NM_LOG_AGG_COMPRESSION_TYPE = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 7391872..7dbceaf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -79,6 +79,20 @@ public Configuration getConf() { public abstract void init() throws IOException; /** + * On Windows the ContainerLaunch creates a temporary empty jar to workaround the CLASSPATH length + * In a secure cluster this jar must be localized so that the container has access to it + * This function localizes on-demand the jar. + * + * @param classPathJar + * @param owner + * @throws IOException + */ + public void localizeClasspathJar(Path classPathJar, String owner) throws IOException { + // For the default container this is a no-op + // The WindowsSecureContainerExecutor overrides this + } + + /** * Prepare the environment for containers in this application to execute. * For $x in local.dirs * create $x/$user/$appId @@ -264,8 +278,8 @@ protected Path getPidFilePath(ContainerId containerId) { * and associate the given groupId in a process group. On * non-Windows, groupId is ignored. */ - protected static String[] getRunCommand(String command, String groupId, - Configuration conf) { + protected String[] getRunCommand(String command, String groupId, + String userName, Path pidFile, Configuration conf) { boolean containerSchedPriorityIsSet = false; int containerSchedPriorityAdjustment = YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY; @@ -396,5 +410,4 @@ public void run() { } } } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index a7af1c5..23d2e72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -61,7 +61,7 @@ private static final int WIN_MAX_PATH = 260; - private final FileContext lfs; + protected final FileContext lfs; public DefaultContainerExecutor() { try { @@ -75,11 +75,19 @@ public DefaultContainerExecutor() { this.lfs = lfs; } + protected void copyFile(Path src, Path dst, String owner) throws IOException { + lfs.util().copy(src, dst); + } + + protected void setScriptExecutable(Path script, String owner) throws IOException { + lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + } + @Override public void init() throws IOException { // nothing to do or verify here } - + @Override public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, InetSocketAddress nmAddr, String user, String appId, String locId, @@ -93,14 +101,14 @@ public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, createUserLocalDirs(localDirs, user); createUserCacheDirs(localDirs, user); createAppDirs(localDirs, user, appId); - createAppLogDirs(appId, logDirs); + createAppLogDirs(appId, logDirs, user); // TODO: Why pick first app dir. The same in LCE why not random? Path appStorageDir = getFirstApplicationDir(localDirs, user, appId); String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); Path tokenDst = new Path(appStorageDir, tokenFn); - lfs.util().copy(nmPrivateContainerTokensPath, tokenDst); + copyFile(nmPrivateContainerTokensPath, tokenDst, user); LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); lfs.setWorkingDirectory(appStorageDir); LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory()); @@ -129,30 +137,29 @@ public int launchContainer(Container container, Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(appCacheDir, appIdStr); Path containerDir = new Path(appDir, containerIdStr); - createDir(containerDir, dirPerm, true); + createDir(containerDir, dirPerm, true, userName); } // Create the container log-dirs on all disks - createContainerLogDirs(appIdStr, containerIdStr, logDirs); + createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName); Path tmpDir = new Path(containerWorkDir, YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); - createDir(tmpDir, dirPerm, false); + createDir(tmpDir, dirPerm, false, userName); // copy launch script to work dir Path launchDst = new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); - lfs.util().copy(nmPrivateContainerScriptPath, launchDst); + copyFile(nmPrivateContainerScriptPath, launchDst, userName); // copy container tokens to work dir Path tokenDst = new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); - lfs.util().copy(nmPrivateTokensPath, tokenDst); + copyFile(nmPrivateTokensPath, tokenDst, userName); // Create new local launch wrapper script - LocalWrapperScriptBuilder sb = Shell.WINDOWS ? - new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) : - new UnixLocalWrapperScriptBuilder(containerWorkDir); + LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder( + containerIdStr, containerWorkDir); // Fail fast if attempting to launch the wrapper script would fail due to // Windows path length limitation. @@ -178,14 +185,12 @@ public int launchContainer(Container container, // fork script ShellCommandExecutor shExec = null; try { - lfs.setPermission(launchDst, - ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); - lfs.setPermission(sb.getWrapperScriptPath(), - ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + setScriptExecutable(launchDst, userName); + setScriptExecutable(sb.getWrapperScriptPath(), userName); // Setup command to run String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), - containerIdStr, this.getConf()); + containerIdStr, userName, pidFile, this.getConf()); LOG.info("launchContainer: " + Arrays.toString(command)); shExec = new ShellCommandExecutor( @@ -241,7 +246,14 @@ public int launchContainer(Container container, return 0; } - private abstract class LocalWrapperScriptBuilder { + protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( + String containerIdStr, Path containerWorkDir) { + return Shell.WINDOWS ? + new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) : + new UnixLocalWrapperScriptBuilder(containerWorkDir); + } + + protected abstract class LocalWrapperScriptBuilder { private final Path wrapperScriptPath; @@ -449,7 +461,7 @@ public void deleteAsUser(String user, Path subDir, Path... baseDirs) * $logdir/$user/$appId */ static final short LOGDIR_PERM = (short)0710; - private Path getFirstApplicationDir(List localDirs, String user, + protected Path getFirstApplicationDir(List localDirs, String user, String appId) { return getApplicationDir(new Path(localDirs.get(0)), user, appId); } @@ -472,8 +484,8 @@ private Path getFileCacheDir(Path base, String user) { ContainerLocalizer.FILECACHE); } - private void createDir(Path dirPath, FsPermission perms, - boolean createParent) throws IOException { + protected void createDir(Path dirPath, FsPermission perms, + boolean createParent, String user) throws IOException { lfs.mkdir(dirPath, perms, createParent); if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { lfs.setPermission(dirPath, perms); @@ -493,7 +505,7 @@ void createUserLocalDirs(List localDirs, String user) for (String localDir : localDirs) { // create $local.dir/usercache/$user and its immediate parent try { - createDir(getUserCacheDir(new Path(localDir), user), userperms, true); + createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user); } catch (IOException e) { LOG.warn("Unable to create the user directory : " + localDir, e); continue; @@ -529,7 +541,7 @@ void createUserCacheDirs(List localDirs, String user) Path localDirPath = new Path(localDir); final Path appDir = getAppcacheDir(localDirPath, user); try { - createDir(appDir, appCachePerms, true); + createDir(appDir, appCachePerms, true, user); appcacheDirStatus = true; } catch (IOException e) { LOG.warn("Unable to create app cache directory : " + appDir, e); @@ -537,7 +549,7 @@ void createUserCacheDirs(List localDirs, String user) // create $local.dir/usercache/$user/filecache final Path distDir = getFileCacheDir(localDirPath, user); try { - createDir(distDir, fileperms, true); + createDir(distDir, fileperms, true, user); distributedCacheDirStatus = true; } catch (IOException e) { LOG.warn("Unable to create file cache directory : " + distDir, e); @@ -570,7 +582,7 @@ void createAppDirs(List localDirs, String user, String appId) Path fullAppDir = getApplicationDir(new Path(localDir), user, appId); // create $local.dir/usercache/$user/appcache/$appId try { - createDir(fullAppDir, appperms, true); + createDir(fullAppDir, appperms, true, user); initAppDirStatus = true; } catch (IOException e) { LOG.warn("Unable to create app directory " + fullAppDir.toString(), e); @@ -586,7 +598,7 @@ void createAppDirs(List localDirs, String user, String appId) /** * Create application log directories on all disks. */ - void createAppLogDirs(String appId, List logDirs) + void createAppLogDirs(String appId, List logDirs, String user) throws IOException { boolean appLogDirStatus = false; @@ -595,7 +607,7 @@ void createAppLogDirs(String appId, List logDirs) // create $log.dir/$appid Path appLogDir = new Path(rootLogDir, appId); try { - createDir(appLogDir, appLogDirPerms, true); + createDir(appLogDir, appLogDirPerms, true, user); } catch (IOException e) { LOG.warn("Unable to create the app-log directory : " + appLogDir, e); continue; @@ -612,7 +624,7 @@ void createAppLogDirs(String appId, List logDirs) * Create application log directories on all disks. */ void createContainerLogDirs(String appId, String containerId, - List logDirs) throws IOException { + List logDirs, String user) throws IOException { boolean containerLogDirStatus = false; FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM); @@ -621,7 +633,7 @@ void createContainerLogDirs(String appId, String containerId, Path appLogDir = new Path(rootLogDir, appId); Path containerLogDir = new Path(appLogDir, containerId); try { - createDir(containerLogDir, containerLogDirPerms, true); + createDir(containerLogDir, containerLogDirPerms, true, user); } catch (IOException e) { LOG.warn("Unable to create the container-log directory : " + appLogDir, e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 804864e..6d9cdd7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -219,15 +219,7 @@ public void startLocalizer(Path nmPrivateContainerTokensPath, if (javaLibPath != null) { command.add("-Djava.library.path=" + javaLibPath); } - command.add(ContainerLocalizer.class.getName()); - command.add(user); - command.add(appId); - command.add(locId); - command.add(nmAddr.getHostName()); - command.add(Integer.toString(nmAddr.getPort())); - for (String dir : localDirs) { - command.add(dir); - } + ContainerLocalizer.buildMainArgs(command, runAsUser, appId, locId, nmAddr, localDirs); String[] commandArray = command.toArray(new String[command.size()]); ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray); if (LOG.isDebugEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java new file mode 100644 index 0000000..30beaf8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.LocalWrapperScriptBuilder; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; + +/** + * Windows secure container executor. Uses winutils task createAsUser. + * + */ +public class WindowsSecureContainerExecutor extends DefaultContainerExecutor { + + private static final Log LOG = LogFactory + .getLog(WindowsSecureContainerExecutor.class); + + private class WindowsSecureWrapperScriptBuilder + extends LocalWrapperScriptBuilder { + + public WindowsSecureWrapperScriptBuilder(Path containerWorkDir) { + super(containerWorkDir); + } + + @Override + protected void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { + pout.format("@call \"%s\"", launchDst); + } + } + + private String nodeManagerGroup; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + nodeManagerGroup = conf.get(YarnConfiguration.NM_WINDOWS_SECURE_CONTAINER_GROUP); + } + + @Override + protected String[] getRunCommand(String command, String groupId, + String userName, Path pidFile, Configuration conf) { + return new String[] { Shell.WINUTILS, "task", "createAsUser", groupId, userName, + pidFile.toString(), "cmd /c " + command }; + } + + @Override + protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( + String containerIdStr, Path containerWorkDir) { + return new WindowsSecureWrapperScriptBuilder(containerWorkDir); + } + + @Override + protected void copyFile(Path src, Path dst, String owner) throws IOException { + super.copyFile(src, dst, owner); + lfs.setOwner(dst, owner, nodeManagerGroup); + } + + @Override + protected void createDir(Path dirPath, FsPermission perms, + boolean createParent, String owner) throws IOException { + super.createDir(dirPath, perms, createParent, owner); + lfs.setOwner(dirPath, owner, nodeManagerGroup); + } + + @Override + protected void setScriptExecutable(Path script, String owner) throws IOException { + super.setScriptExecutable(script, null); + lfs.setOwner(script, owner, nodeManagerGroup); + } + + @Override + public void localizeClasspathJar(Path classpathJar, String owner) throws IOException { + lfs.setOwner(classpathJar, owner, nodeManagerGroup); + } + + @Override + public void startLocalizer(Path nmPrivateContainerTokens, + InetSocketAddress nmAddr, String user, String appId, String locId, + List localDirs, List logDirs) throws IOException, + InterruptedException { + + createUserLocalDirs(localDirs, user); + createUserCacheDirs(localDirs, user); + createAppDirs(localDirs, user, appId); + createAppLogDirs(appId, logDirs, user); + + // TODO: Why pick first app dir. The same in LCE why not random? + Path appStorageDir = getFirstApplicationDir(localDirs, user, appId); + + String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); + Path tokenDst = new Path(appStorageDir, tokenFn); + LOG.info("Copying from " + nmPrivateContainerTokens + " to " + tokenDst); + copyFile(nmPrivateContainerTokens, tokenDst, user); + + List command ; + String[] commandArray; + ShellCommandExecutor shExec; + + File cwdApp = new File(appStorageDir.toString()); + LOG.info(String.format("cwdApp: %s", cwdApp)); + + command = new ArrayList(); + + command.add(Shell.WINUTILS); + command.add("task"); + command.add("createAsUser"); + command.add("START_LOCALIZER_" + locId); + command.add(user); + command.add("nul:"); // PID file + + //use same jvm as parent + File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java.exe"); + command.add(jvm.toString()); + + + // Build a temp classpath jar. See ContainerLaunch.sanitizeEnv(). + // Passing CLASSPATH explicitly is *way* too long for command line. + String classPath = System.getProperty("java.class.path"); + Map env = new HashMap(System.getenv()); + String classPathJar = FileUtil.createJarWithClassPath(classPath, appStorageDir, env); + localizeClasspathJar(new Path(classPathJar), user); + command.add("-classpath"); + command.add(classPathJar); + + String javaLibPath = System.getProperty("java.library.path"); + if (javaLibPath != null) { + command.add("-Djava.library.path=" + javaLibPath); + } + + ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr, localDirs); + commandArray = command.toArray(new String[command.size()]); + + shExec = new ShellCommandExecutor( + commandArray, cwdApp); + + shExec.execute(); + } +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index cee6a40..87a36c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -766,6 +766,8 @@ public void sanitizeEnv(Map environment, Path pwd, String classPathJar = FileUtil.createJarWithClassPath( newClassPath.toString(), pwd, mergedEnv); + // In a secure cluster the classpath jar must be localized to grant access + this.exec.localizeClasspathJar(new Path(classPathJar), container.getUser()); environment.put(Environment.CLASSPATH.name(), classPathJar); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index f05f49c..3525170 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -312,6 +312,31 @@ private LocalizerStatus createStatus() throws InterruptedException { status.addAllResources(currentResources); return status; } + + /** + * Adds the ContainerLocalizer arguments for a @{link ShellCommandExecutor}, + * as expected by ContainerLocalizer.main + * @param command the current ShellCommandExecutor command line + * @param user localization user + * @param appId localized app id + * @param locId localizer id + * @param nmAddr nodemanager address + * @param localDirs list of local dirs + */ + public static void buildMainArgs(List command, + String user, String appId, String locId, + InetSocketAddress nmAddr, List localDirs) { + + command.add(ContainerLocalizer.class.getName()); + command.add(user); + command.add(appId); + command.add(locId); + command.add(nmAddr.getHostName()); + command.add(Integer.toString(nmAddr.getPort())); + for(String dir : localDirs) { + command.add(dir); + } + } public static void main(String[] argv) throws Throwable { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java index c04ec29..fd3634b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java @@ -27,11 +27,13 @@ import static org.junit.Assert.*; public class TestContainerExecutor { + + private ContainerExecutor containerExecutor = new DefaultContainerExecutor(); @Test (timeout = 5000) public void testRunCommandNoPriority() throws Exception { Configuration conf = new Configuration(); - String[] command = ContainerExecutor.getRunCommand("echo", "group1", conf); + String[] command = containerExecutor.getRunCommand("echo", "group1", "user", null, conf); assertTrue("first command should be the run command for the platform", command[0].equals(Shell.WINUTILS) || command[0].equals("bash")); } @@ -40,7 +42,7 @@ public void testRunCommandNoPriority() throws Exception { public void testRunCommandwithPriority() throws Exception { Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 2); - String[] command = ContainerExecutor.getRunCommand("echo", "group1", conf); + String[] command = containerExecutor.getRunCommand("echo", "group1", "user", null, conf); if (Shell.WINDOWS) { // windows doesn't currently support assertEquals("first command should be the run command for the platform", @@ -54,7 +56,7 @@ public void testRunCommandwithPriority() throws Exception { // test with negative number conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, -5); - command = ContainerExecutor.getRunCommand("echo", "group1", conf); + command = containerExecutor.getRunCommand("echo", "group1", "user", null, conf); if (Shell.WINDOWS) { // windows doesn't currently support assertEquals("first command should be the run command for the platform", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index 9c86c71..f6f0e9f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -199,7 +199,7 @@ public void testDirPermissions() throws Exception { Assert.assertEquals(appDirPerm, stats.getPermission()); } - executor.createAppLogDirs(appId, logDirs); + executor.createAppLogDirs(appId, logDirs, user); for (String dir : logDirs) { FileStatus stats = lfs.getFileStatus(new Path(dir, appId)); @@ -277,7 +277,7 @@ public Object answer(InvocationOnMock invocationOnMock) mockExec.createUserLocalDirs(localDirs, appSubmitter); mockExec.createUserCacheDirs(localDirs, appSubmitter); mockExec.createAppDirs(localDirs, appSubmitter, appId); - mockExec.createAppLogDirs(appId, logDirs); + mockExec.createAppLogDirs(appId, logDirs, appSubmitter); Path scriptPath = new Path("file:///bin/echo"); Path tokensPath = new Path("file:///dev/null"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/SecureContainer.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/SecureContainer.apt.vm new file mode 100644 index 0000000..1f9688a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/SecureContainer.apt.vm @@ -0,0 +1,118 @@ +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + YARN Secure Containers + --- + --- + ${maven.build.timestamp} + +YARN Secure Containers + +%{toc|section=1|fromDepth=0|toDepth=3} + +* {Overview} + + YARN containers in a secure cluster use the operating system facilities to offer + execution isolation for containers. Secure containers execute under the credentials + of the job user. The operating system enforces access restriction for the container. + The container must run as the use that submitted the application. + + Secure Containers work only in the context of secured YARN clusters. + + ** Container isolation requirements + + The container executor must access the local files and directories needed by the + container such as jars, configuration files, log files, shared objects etc. Although + it is launched by the NodeManager, the container should not have access to the + NodeManager private files and configuration. Container running applications + submitted by different users should be isolated and unable to access each other + files and directories. Similar requirements apply to other system non-file securable + objects like named pipes, critical sections, LPC queues, shared memory etc. + + + ** Linux Secure Container Executor + + On Linux environment the secure container executor is the <<>>. + It uses an external program called the <>> to launch the container. + This program has the <<>> access right flag set which allows it to launch + the container with the permissions of the YARN application user. + + *** Configuration + + The configured directories for <<>> and + <<>> must be owned by the configured NodeManager user + (<<>>) and group (<<>>). The permission set on these directories must + be <<>>. + + The <<>> program must be owned by <<>> and have the + permission set <<<---sr-s--->>>. + + To configure the <<>> to use the <<>> set the following + in the <>: + ++---+ + + yarn.nodemanager.container-executor.class + org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor + + + + yarn.nodemanager.linux-container-executor.group + hadoop + ++---+ + + Additionally the LCE requires the <<>> file, which is read by the + <<>> program. + ++---+ +yarn.nodemanager.linux-container-executor.group=#configured value of yarn.nodemanager.linux-container-executor.group +banned.users=#comma separated list of users who can not run applications +allowed.system.users=#comma separated list of allowed system users +min.user.id=1000#Prevent other super-users ++---+ + + + ** Windows Secure Container Executor + + The Windows environment secure container executor is the <<>>. + It uses the Windows S4U infrastructure to launch the container as the + YARN application user. + + *** Configuration + + To configure the <<>> to use the <<>> + set the following in the <>: + ++---+ + + yarn.nodemanager.container-executor.class + org.apache.hadoop.yarn.server.nodemanager.WindowsSecureContainerExecutor + + + + yarn.nodemanager.windows-secure-container-executor.group + hadoop + ++---+ + + The NodeManager must run as a member of the local <<>> group or as + <<>>. It is not enough for the NodeManager to simply impersonate such an user. + + *** Useful Links + + * {{{http://msdn.microsoft.com/en-us/magazine/cc188757.aspx}Exploring S4U Kerberos Extensions in Windows Server 2003}} + + * {{{https://issues.apache.org/jira/browse/YARN-1063}Winutils needs ability to create task as domain user}} + + * {{{https://issues.apache.org/jira/browse/YARN-1972}Implement secure Windows Container Executor}} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm index 9f07b19..adb2a0a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm @@ -52,6 +52,8 @@ MapReduce NextGen aka YARN aka MRv2 * {{{./WebApplicationProxy.html}Web Application Proxy}} * {{{./TimelineServer.html}YARN Timeline Server}} + + * {{{./SecureContainer.html}YARN Secure Containers}} * {{{../../hadoop-project-dist/hadoop-common/CLIMiniCluster.html}CLI MiniCluster}}