diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 976a93f..a7f5b9c 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.io.nativeio; +import java.io.BufferedInputStream; import java.io.File; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.RandomAccessFile; import java.lang.reflect.Field; import java.nio.ByteBuffer; @@ -833,4 +836,71 @@ public static void renameTo(File src, File dst) */ private static native void renameTo0(String src, String dst) throws NativeIOException; + + /** + * Wraps a process started by the winutils service helper. + * + */ + public static class WinutilsProcessStub extends Process { + + private final long hProcess; + private final long hThread; + private boolean disposed = false; + + private final InputStream stdErr; + private final InputStream stdOut; + private final OutputStream stdIn; + + public WinutilsProcessStub(long hProcess, long hThread, long hStdIn, long hStdOut, long hStdErr) { + this.hProcess = hProcess; + this.hThread = hThread; + + this.stdIn = new FileOutputStream(getFileDescriptorFromHandle(hStdIn)); + this.stdOut = new FileInputStream(getFileDescriptorFromHandle(hStdOut)); + this.stdErr = new FileInputStream(getFileDescriptorFromHandle(hStdErr)); + } + + private static native FileDescriptor getFileDescriptorFromHandle(long handle); + + @Override + public native void destroy(); + + @Override + public native int exitValue(); + + @Override + public InputStream getErrorStream() { + return stdErr; + } + @Override + public InputStream getInputStream() { + return stdOut; + } + @Override + public OutputStream getOutputStream() { + return stdIn; + } + @Override + public native int waitFor() throws InterruptedException; + + public synchronized native void dispose(); + + public native void resume() throws NativeIOException; + } + + public synchronized static WinutilsProcessStub createTaskAsUser( + String cwd, String jobName, String user, String pidFile, String cmdLine) + throws IOException { + if (!nativeLoaded) { + throw new IOException("NativeIO libraries are required for createTaskAsUser"); + } + synchronized(Shell.WindowsProcessLaunchLock) { + return createTaskAsUser0(cwd, jobName, user, pidFile, cmdLine); + } + } + + private static native WinutilsProcessStub createTaskAsUser0( + String cwd, String jobName, String user, String pidFile, String cmdLine) + throws NativeIOException; + } diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java index fcdc021..67297cd 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java @@ -643,6 +643,18 @@ public String toString() { } } + public interface ICommandExecutor { + + void execute() throws IOException; + + int getExitCode() throws IOException; + + String getOutput() throws IOException; + + void dispose(); + + } + /** * A simple shell command executor. * @@ -651,7 +663,7 @@ public String toString() { * directory and the environment remains unchanged. The output of the command * is stored as-is and is expected to be small. */ - public static class ShellCommandExecutor extends Shell { + public static class ShellCommandExecutor extends Shell implements ICommandExecutor { private String[] command; private StringBuffer output; @@ -743,6 +755,10 @@ public String toString() { } return builder.toString(); } + + @Override + public void dispose() { + } } /** diff --git hadoop-common-project/hadoop-common/src/main/native/native.vcxproj hadoop-common-project/hadoop-common/src/main/native/native.vcxproj index 0d67e1e..e743788 100644 --- hadoop-common-project/hadoop-common/src/main/native/native.vcxproj +++ hadoop-common-project/hadoop-common/src/main/native/native.vcxproj @@ -99,6 +99,7 @@ + diff --git hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index 95bb987..022c8c8 100644 --- hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -49,6 +49,7 @@ #include "file_descriptor.h" #include "errno_enum.h" +#include "winutils_process_stub.h" #define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ #define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE @@ -68,8 +69,13 @@ static jmethodID nioe_ctor; // Please see HADOOP-7156 for details. jobject pw_lock_object; +/* + * Throw a java.IO.IOException, generating the message from errno. + * NB. this is also used form winutils_process_stub.c + */ +extern void throw_ioe(JNIEnv* env, int errnum); + // Internal functions -static void throw_ioe(JNIEnv* env, int errnum); #ifdef UNIX static ssize_t get_pw_buflen(); #endif @@ -191,6 +197,12 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative( errno_enum_init(env); PASS_EXCEPTIONS_GOTO(env, error); #endif + +#ifdef WINDOWS + winutils_process_stub_init(env); + PASS_EXCEPTIONS_GOTO(env, error); +#endif + return; error: // these are all idempodent and safe to call even if the @@ -203,6 +215,9 @@ error: #ifdef UNIX errno_enum_deinit(env); #endif +#ifdef WINDOWS + winutils_process_stub_deinit(env); +#endif } /* @@ -766,7 +781,7 @@ cleanup: /* * Throw a java.IO.IOException, generating the message from errno. */ -static void throw_ioe(JNIEnv* env, int errnum) +void throw_ioe(JNIEnv* env, int errnum) { #ifdef UNIX char message[80]; @@ -1072,6 +1087,85 @@ JNIEnv *env, jclass clazz) #endif } + +/* + * Class: org_apache_hadoop_io_nativeio_NativeIO + * Method: createTaskAsUser + * Signature: (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String)Lorg/apache/hadoop/io/nativeio/NativeIO$WinutilsProcessStub + */ +JNIEXPORT jobject JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_createTaskAsUser0(JNIEnv* env, + jclass clazz, jstring cwd, jstring jobName, jstring user, jstring pidFile, jstring cmdLine) { +#ifdef UNIX + THROW(env, "java/io/IOException", + "The function createTaskAsUser is not supported on Unix"); + return -1; +#endif + +#ifdef WINDOWS + LPCWSTR lpszCwd = NULL, lpszJobName = NULL, + lpszUser = NULL, lpszPidFile = NULL, lpszCmdLine = NULL; + DWORD dwError = ERROR_SUCCESS; + HANDLE hProcess = INVALID_HANDLE_VALUE, + hThread = INVALID_HANDLE_VALUE, + hStdIn = INVALID_HANDLE_VALUE, + hStdOut = INVALID_HANDLE_VALUE, + hStdErr = INVALID_HANDLE_VALUE; + jobject ret = NULL; + + lpszCwd = (LPCWSTR) (*env)->GetStringChars(env, cwd, NULL); + if (!lpszCwd) goto done; // exception was thrown + + lpszJobName = (LPCWSTR) (*env)->GetStringChars(env, jobName, NULL); + if (!lpszJobName) goto done; // exception was thrown + + lpszUser = (LPCWSTR) (*env)->GetStringChars(env, user, NULL); + if (!lpszUser) goto done; // exception was thrown + + lpszPidFile = (LPCWSTR) (*env)->GetStringChars(env, pidFile, NULL); + if (!lpszPidFile) goto done; // exception was thrown + + lpszCmdLine = (LPCWSTR) (*env)->GetStringChars(env, cmdLine, NULL); + if (!lpszCmdLine) goto done; // exception was thrown + + LogDebugMessage(L"createTaskAsUser: cwd:%s job:%s user:%s pid:%s cmd:%s\n", + lpszCwd, lpszJobName, lpszUser, lpszPidFile, lpszCmdLine); + + dwError = RpcCall_TaskCreateAsUser(lpszCwd, lpszJobName, lpszUser, lpszPidFile, lpszCmdLine, + &hProcess, &hThread, &hStdIn, &hStdOut, &hStdErr); + + if (ERROR_SUCCESS == dwError) { + ret = winutils_process_stub_create(env, (jlong) hProcess, (jlong) hThread, + (jlong) hStdIn, (jlong) hStdOut, (jlong) hStdErr); + + if (NULL == ret) { + TerminateProcess(hProcess, EXIT_FAILURE); + CloseHandle(hThread); + CloseHandle(hProcess); + CloseHandle(hStdIn); + CloseHandle(hStdOut); + CloseHandle(hStdErr); + } + } + +done: + + if (lpszCwd) (*env)->ReleaseStringChars(env, cwd, lpszCwd); + if (lpszJobName) (*env)->ReleaseStringChars(env, jobName, lpszJobName); + if (lpszUser) (*env)->ReleaseStringChars(env, user, lpszUser); + if (lpszPidFile) (*env)->ReleaseStringChars(env, pidFile, lpszPidFile); + if (lpszCmdLine) (*env)->ReleaseStringChars(env, cmdLine, lpszCmdLine); + + if (dwError != ERROR_SUCCESS) { + throw_ioe (env, dwError); + } + + return ret; + +#endif +} + + /** * vim: sw=2: ts=2: et: */ diff --git hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c new file mode 100644 index 0000000..d8afcca --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c @@ -0,0 +1,189 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include +#include "org_apache_hadoop.h" +#include "winutils_process_stub.h" +#include "winutils.h" +#include "file_descriptor.h" + +// class of org.apache.hadoop.io.nativeio.NativeIO.WinutilsProcessStub +static jclass wps_class = NULL; + + +static jmethodID wps_constructor = NULL; +static jfieldID wps_hProcess = NULL; +static jfieldID wps_hThread = NULL; +static jfieldID wps_disposed = NULL; + +extern void throw_ioe(JNIEnv* env, int errnum); + +void winutils_process_stub_init(JNIEnv *env) { + if (wps_class != NULL) return; // already initted + + wps_class = (*env)->FindClass(env, WINUTILS_PROCESS_STUB_CLASS); + PASS_EXCEPTIONS(env); + wps_class = (*env)->NewGlobalRef(env, wps_class); + + wps_hProcess = (*env)->GetFieldID(env, wps_class, "hProcess", "J"); + PASS_EXCEPTIONS(env); + + wps_hThread = (*env)->GetFieldID(env, wps_class, "hThread", "J"); + PASS_EXCEPTIONS(env); + + wps_disposed = (*env)->GetFieldID(env, wps_class, "disposed", "Z"); + PASS_EXCEPTIONS(env); + + wps_constructor = (*env)->GetMethodID(env, wps_class, "", "(JJJJJ)V"); + + LogDebugMessage(L"winutils_process_stub_init\n"); +} + +void winutils_process_stub_deinit(JNIEnv *env) { + if (wps_class != NULL) { + (*env)->DeleteGlobalRef(env, wps_class); + wps_class = NULL; + } + wps_hProcess = NULL; + wps_hThread = NULL; + wps_disposed = NULL; + wps_constructor = NULL; + LogDebugMessage(L"winutils_process_stub_deinit\n"); +} + +jobject winutils_process_stub_create(JNIEnv *env, + jlong hProcess, jlong hThread, jlong hStdIn, jlong hStdOut, jlong hStdErr) { + jobject obj = (*env)->NewObject(env, wps_class, wps_constructor, + hProcess, hThread, hStdIn, hStdOut, hStdErr); + PASS_EXCEPTIONS_RET(env, NULL); + + LogDebugMessage(L"winutils_process_stub_create: %p\n", obj); + + return obj; +} + + +/* + * native void destroy(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_destroy( + JNIEnv *env, jobject objSelf) { + + HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + LogDebugMessage(L"TerminateProcess: %x\n", hProcess); + TerminateProcess(hProcess, EXIT_FAILURE); +} + +/* + * native void waitFor(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_waitFor( + JNIEnv *env, jobject objSelf) { + + HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + LogDebugMessage(L"WaitForSingleObject: %x\n", hProcess); + WaitForSingleObject(hProcess, INFINITE); +} + + + +/* + * native void resume(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_resume( + JNIEnv *env, jobject objSelf) { + + DWORD dwError; + HANDLE hThread = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hThread); + if (-1 == ResumeThread(hThread)) { + dwError = GetLastError(); + LogDebugMessage(L"ResumeThread: %x error:%d\n", hThread, dwError); + throw_ioe(env, dwError); + } +} + +/* + * native int exitValue(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT jint JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_exitValue( + JNIEnv *env, jobject objSelf) { + + DWORD exitCode; + HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + GetExitCodeProcess(hProcess, &exitCode); + LogDebugMessage(L"GetExitCodeProcess: %x :%d\n", hProcess, exitCode); + + return exitCode; +} + + +/* + * native void dispose(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_dispose( + JNIEnv *env, jobject objSelf) { + + HANDLE hProcess, hThread; + + jboolean disposed = (*env)->GetBooleanField(env, objSelf, wps_disposed); + + if (JNI_TRUE != disposed) { + hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + hThread = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hThread); + + CloseHandle(hProcess); + CloseHandle(hThread); + (*env)->SetBooleanField(env, objSelf, wps_disposed, JNI_TRUE); + LogDebugMessage(L"disposed: %p\n", objSelf); + } +} + + +/* + * native static FileDescriptor getFileDescriptorFromHandle(long handle); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT jobject JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_getFileDescriptorFromHandle( + JNIEnv *env, jclass klass, jlong handle) { + + LogDebugMessage(L"getFileDescriptorFromHandle: %x\n", handle); + return fd_create(env, (long) handle); +} + diff --git hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h new file mode 100644 index 0000000..6ab8ad6 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + + +#define WINUTILS_PROCESS_STUB_CLASS "org/apache/hadoop/io/nativeio/NativeIO$WinutilsProcessStub" + +void winutils_process_stub_init(JNIEnv *env); +void winutils_process_stub_deinit(JNIEnv *env); +jobject winutils_process_stub_create(JNIEnv *env, + jlong hProcess, jlong hThread, jlong hStdIn, jlong hStdOut, jlong hStdErr); + + diff --git hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h index 92a6b27..3fd5a58 100644 --- hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h +++ hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h @@ -32,6 +32,7 @@ #define UNIX #endif + /* A helper macro to 'throw' a java exception. */ #define THROW(env, exception_name, message) \ { \ diff --git hadoop-common-project/hadoop-common/src/main/winutils/client.c hadoop-common-project/hadoop-common/src/main/winutils/client.c new file mode 100644 index 0000000..3b8fda1 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/winutils/client.c @@ -0,0 +1,162 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "winutils.h" +#include +#include +#include "hdpwinutilsvc_h.h" + +#pragma comment(lib, "Rpcrt4.lib") +#pragma comment(lib, "advapi32.lib") + +static ACCESS_MASK CLIENT_MASK = 1; + + +VOID ReportClientError(LPWSTR lpszLocation, DWORD dwError) { + LPWSTR debugMsg = NULL; + int len; + WCHAR hexError[32]; + HRESULT hr; + + if (IsDebuggerPresent()) { + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&debugMsg, 0, NULL); + + LogDebugMessage(L"%s: %s: %x: %.*s\n", GetSystemTimeString(), lpszLocation, dwError, len, debugMsg); + } + + if (NULL != debugMsg) LocalFree(debugMsg); +} + +DWORD RpcCall_TaskCreateAsUser( + LPCWSTR cwd, LPCWSTR jobName, + LPCWSTR user, LPCWSTR pidFile, LPCWSTR cmdLine, + HANDLE* phProcess, HANDLE* phThread, HANDLE* phStdIn, HANDLE* phStdOut, HANDLE* phStdErr) +{ + DWORD dwError = EXIT_FAILURE; + RPC_STATUS status; + LPWSTR lpszStringBinding = NULL; + ULONG ulCode; + DWORD dwSelfPid = GetCurrentProcessId(); + CREATE_PROCESS_REQUEST request; + CREATE_PROCESS_RESPONSE *response = NULL; + RPC_SECURITY_QOS_V3 qos; + PSID pLocalSystemSid = NULL; + SID_IDENTIFIER_AUTHORITY authNT = SECURITY_NT_AUTHORITY; + + if (!AllocateAndInitializeSid(&authNT, 1, + SECURITY_LOCAL_SYSTEM_RID, + 0, 0, 0, 0, 0, 0, 0, + &pLocalSystemSid)) { + dwError = GetLastError(); + ReportClientError(L"AllocateAndInitializeSid", dwError); + goto done; + } + + ZeroMemory(&qos, sizeof(qos)); + qos.Version = RPC_C_SECURITY_QOS_VERSION_3; + qos.Capabilities = RPC_C_QOS_CAPABILITIES_LOCAL_MA_HINT | RPC_C_QOS_CAPABILITIES_MUTUAL_AUTH; + qos.IdentityTracking = RPC_C_QOS_IDENTITY_DYNAMIC; + qos.ImpersonationType = RPC_C_IMP_LEVEL_DEFAULT; + qos.Sid = pLocalSystemSid; + + ZeroMemory(&request, sizeof(request)); + request.cwd = cwd; + request.jobName = jobName; + request.user = user; + request.pidFile = pidFile; + request.cmdLine = cmdLine; + + status = RpcStringBindingCompose(NULL, + SVCBINDING, + NULL, + SVCNAME, + NULL, + &lpszStringBinding); + if (RPC_S_OK != status) { + ReportClientError(L"RpcStringBindingCompose", status); + dwError = status; + goto done; + } + + status = RpcBindingFromStringBinding(lpszStringBinding, &hHdpWinutilsSvcBinding); + + if (RPC_S_OK != status) { + ReportClientError(L"RpcBindingFromStringBinding", status); + dwError = status; + goto done; + } + + status = RpcBindingSetAuthInfoEx( + hHdpWinutilsSvcBinding, + NULL, + RPC_C_AUTHN_LEVEL_PKT_PRIVACY, // AuthnLevel + RPC_C_AUTHN_WINNT, // AuthnSvc + NULL, // AuthnIdentity (self) + RPC_C_AUTHZ_NONE, // AuthzSvc + &qos); + if (RPC_S_OK != status) { + ReportClientError(L"RpcBindingSetAuthInfoEx", status); + dwError = status; + goto done; + } + + RpcTryExcept { + dwError = WinutilsCreateProcessAsUser(dwSelfPid, &request, &response); + } + RpcExcept(1) { + ulCode = RpcExceptionCode(); + ReportClientError(L"RpcExcept", ulCode); + dwError = (DWORD) ulCode; + } + RpcEndExcept; + + if (ERROR_SUCCESS == dwError) { + *phProcess = response->hProcess; + *phThread = response->hThread; + *phStdIn = response->hStdIn; + *phStdOut = response->hStdOut; + *phStdErr = response->hStdErr; + } + + // From here on forward we do no change dwError even on RPC cleanup errors + status = RpcBindingFree(&hHdpWinutilsSvcBinding); + if (RPC_S_OK != status) { + ReportClientError(L"RpcBindingFree", status); + goto done; + } + +done: + if (pLocalSystemSid) FreeSid(pLocalSystemSid); + + if (NULL != response) { + MIDL_user_free(response); + } + + if (NULL != lpszStringBinding) { + status = RpcStringFree(&lpszStringBinding); + if (RPC_S_OK != status) { + ReportClientError(L"RpcStringFree", status); + } + } + + return dwError; +} + diff --git hadoop-common-project/hadoop-common/src/main/winutils/config.cpp hadoop-common-project/hadoop-common/src/main/winutils/config.cpp new file mode 100644 index 0000000..8524f7c --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/winutils/config.cpp @@ -0,0 +1,133 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "winutils.h" +#include +#import "msxml6.dll" + + +#define YARN_SITE_XML_PATH L"%HADOOP_CONF_DIR%\\yarn-site.xml" +#define YARN_DEFAULT_XML_PATH L"%HADOOP_CONF_DIR%\\yarn-default.xml" + + +#define ERROR_CHECK_HRESULT_DONE(hr, message) \ + if (FAILED(hr)) { \ + dwError = (DWORD) hr; \ + LogDebugMessage(L"%s: %x", message, hr); \ + goto done; \ + } + +DWORD GetConfigValue(__in LPCWSTR keyName, + __out size_t* len, __out_bcount(len) LPCWSTR* value) { + + DWORD dwError = ERROR_SUCCESS; + WCHAR xmlPath[MAX_PATH]; + + *len = 0; + *value = NULL; + + if (0 == ExpandEnvironmentStrings(YARN_SITE_XML_PATH, xmlPath, MAX_PATH)) { + dwError = GetLastError(); + goto done; + } + + dwError = GetConfigValueFromXmlFile(xmlPath, keyName, len, value); + if (*len) { + goto done; + } + + if (0 == ExpandEnvironmentStrings(YARN_DEFAULT_XML_PATH, xmlPath, MAX_PATH)) { + dwError = GetLastError(); + goto done; + } + + dwError = GetConfigValueFromXmlFile(xmlPath, keyName, len, value); + +done: + if (*len) { + LogDebugMessage(L"GetConfigValue:%d key:%s len:%d value:%.*s from:%s\n", dwError, keyName, *len, *len, *value, xmlPath); + } + return dwError; +} + + +DWORD GetConfigValueFromXmlFile(__in LPCWSTR xmlFile, __in LPCWSTR keyName, + __out size_t* outLen, __out_bcount(len) LPCWSTR* outValue) { + + DWORD dwError = ERROR_SUCCESS; + HRESULT hr; + WCHAR keyXsl[8192]; + size_t len = 0; + LPWSTR value = NULL; + + *outLen = 0; + *outValue = NULL; + + hr = StringCbPrintf(keyXsl, sizeof(keyXsl), L"//configuration/property[name='%s']/value/text()", keyName); + ERROR_CHECK_HRESULT_DONE(hr, L"StringCbPrintf"); + + hr = CoInitialize(NULL); + ERROR_CHECK_HRESULT_DONE(hr, L"CoInitialize"); + + try { + MSXML2::IXMLDOMDocument2Ptr pDoc; + hr = pDoc.CreateInstance(__uuidof(MSXML2::DOMDocument60), NULL, CLSCTX_INPROC_SERVER); + ERROR_CHECK_HRESULT_DONE(hr, L"CreateInstance"); + + pDoc->async = VARIANT_FALSE; + pDoc->validateOnParse = VARIANT_FALSE; + pDoc->resolveExternals = VARIANT_FALSE; + + _variant_t file(xmlFile); + + if (VARIANT_FALSE == pDoc->load(file)) { + dwError = pDoc->parseError->errorCode; + LogDebugMessage(L"load %s failed:%d %s\n", xmlFile, dwError, + static_cast(pDoc->parseError->Getreason())); + goto done; + } + + MSXML2::IXMLDOMElementPtr pRoot = pDoc->documentElement; + MSXML2::IXMLDOMNodePtr keyNode = pRoot->selectSingleNode(keyXsl); + + if (keyNode) { + _bstr_t bstrValue = static_cast<_bstr_t>(keyNode->nodeValue); + len = bstrValue.length(); + value = (LPWSTR) LocalAlloc(LPTR, (len+1) * sizeof(WCHAR)); + LPCWSTR lpwszValue = static_cast(bstrValue); + memcpy(value, lpwszValue, (len) * sizeof(WCHAR)); + LogDebugMessage(L"key:%s :%.*s [%s]\n", keyName, len, value, lpwszValue); + *outLen = len; + *outValue = value; + } + else { + LogDebugMessage(L"node Xpath:%s not found in:%s\n", keyXsl, xmlFile); + } + } + catch(_com_error errorObject) { + dwError = errorObject.Error(); + LogDebugMessage(L"catch _com_error:%x %s\n", dwError, errorObject.ErrorMessage()); + goto done; + } + +done: + CoUninitialize(); + + return dwError; +} + + diff --git hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl new file mode 100644 index 0000000..2d8c2b3 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl @@ -0,0 +1,35 @@ +import "oaidl.idl"; +import "ocidl.idl"; + +[ + uuid(0492311C-1718-4F53-A6EB-86AD7039988D), + version(1.0), + pointer_default(unique), + implicit_handle(handle_t hHdpWinutilsSvcBinding), + endpoint("ncalrpc:[hdpwinutilsvc]"), +] +interface Hdpwinutilsvc +{ + typedef struct { + [string] const wchar_t* cwd; + [string] const wchar_t* jobName; + [string] const wchar_t* user; + [string] const wchar_t* pidFile; + [string] const wchar_t* cmdLine; + } CREATE_PROCESS_REQUEST; + + typedef struct { + LONG_PTR hProcess; + LONG_PTR hThread; + LONG_PTR hStdIn; + LONG_PTR hStdOut; + LONG_PTR hStdErr; + } CREATE_PROCESS_RESPONSE; + + + error_status_t WinutilsCreateProcessAsUser( + [in] int nmPid, + [in] CREATE_PROCESS_REQUEST *request, + [out] CREATE_PROCESS_RESPONSE **response); + +} \ No newline at end of file diff --git hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h index bae754c..628d435 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h +++ hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h @@ -30,6 +30,10 @@ #include #include +#ifdef __cplusplus +extern "C" { +#endif + enum EXIT_CODE { /* Common success exit code shared among all utilities */ @@ -178,3 +182,47 @@ DWORD LoadUserProfileForLogon(__in HANDLE logonHandle, __out PROFILEINFO * pi); DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi); +DWORD RunService(__in int argc, __in_ecount(argc) wchar_t *argv[]); +void ServiceUsage(); + +LPCWSTR GetSystemTimeString(); + +VOID LogDebugMessage(LPCWSTR format, ...); + +DWORD SplitStringIgnoreSpaceW(__in size_t len, __in_bcount(len) LPCWSTR source, + __in WCHAR deli, + __out size_t* count, __out_ecount(count) WCHAR*** out); + +DWORD GetConfigValue( + __in LPCWSTR keyName, + __out size_t* len, + __out_bcount(len) LPCWSTR* value); +DWORD GetConfigValueFromXmlFile( + __in LPCWSTR xmlFile, + __in LPCWSTR keyName, + __out size_t* len, + __out_bcount(len) LPCWSTR* value); + + +DWORD BuildServiceSecurityDescriptor( + __in ACCESS_MASK accessMask, + __in size_t grantSidCount, + __in_ecount(grantSidCount) PSID* pGrantSids, + __in size_t denySidCount, + __in_ecount(denySidCount) PSID* pDenySids, + __out PSECURITY_DESCRIPTOR* pSD); + + +#define SVCNAME TEXT("hdpwinutilsvc") +#define SVCBINDING TEXT("ncalrpc") + +int RpcCall_TaskCreateAsUser( + LPCWSTR cwd, LPCWSTR jobName, + LPCWSTR user, LPCWSTR pidFile, LPCWSTR cmdLine, + HANDLE* phProcess, HANDLE* phThread, HANDLE* phStdIn, HANDLE* phStdOut, HANDLE* phStdErr); + +#ifdef __cplusplus +} +#endif + + diff --git hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c index 3de458c..873ff68 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c +++ hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c @@ -19,9 +19,15 @@ #pragma comment(lib, "netapi32.lib") #pragma comment(lib, "Secur32.lib") #pragma comment(lib, "Userenv.lib") +#pragma comment(lib, "Ntdsapi.lib") + #include "winutils.h" +#include +#include #include #include +#include +#include /* * The array of 12 months' three-letter abbreviations @@ -1706,10 +1712,12 @@ void ReportErrorCode(LPCWSTR func, DWORD err) (LPWSTR)&msg, 0, NULL); if (len > 0) { + LogDebugMessage(L"%s error (%d): %s\n", func, err, msg); fwprintf(stderr, L"%s error (%d): %s\n", func, err, msg); } else { + LogDebugMessage(L"%s error code: %d.\n", func, err); fwprintf(stderr, L"%s error code: %d.\n", func, err); } if (msg != NULL) LocalFree(msg); @@ -2026,6 +2034,8 @@ done: return loadProfileStatus; } + + DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi) { DWORD touchProfileStatus = ERROR_ASSERTION_FAILURE; // Failure to set status should trigger error @@ -2046,3 +2056,384 @@ DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi) done: return touchProfileStatus; } + + +LPCWSTR GetSystemTimeString() { + __declspec(thread) static WCHAR buffer[1024]; + DWORD dwError; + FILETIME ftime; + SYSTEMTIME systime; + LARGE_INTEGER counter, frequency; + int subSec; + double qpc; + HRESULT hr; + buffer[0] = L'\0'; + + // GetSystemTimePreciseAsFileTime is only available in Win8+ and our libs do not link against it + + GetSystemTimeAsFileTime(&ftime); + + if (!FileTimeToSystemTime(&ftime, &systime)) { + dwError = GetLastError(); + LogDebugMessage(L"FileTimeToSystemTime error:%d\n", dwError); + goto done; + } + + // Get the ms from QPC. GetSystemTimeAdjustment is ignored... + + QueryPerformanceCounter(&counter); + QueryPerformanceFrequency(&frequency); + + qpc = (double) counter.QuadPart / (double) frequency.QuadPart; + subSec = ((qpc - (long)qpc) * 1000000); + + hr = StringCbPrintf(buffer, sizeof(buffer), L"%02d:%02d:%02d.%06d", + (int)systime.wHour, (int)systime.wMinute, (int)systime.wSecond, (int)subSec); + + if (FAILED(hr)) { + LogDebugMessage(L"StringCbPrintf error:%d\n", hr); + } +done: + return buffer; +} + + +//---------------------------------------------------------------------------- +// Function: LogDebugMessage +// +// Description: +// Sends a message to the debugger console, if one is attached +// +// Notes: +// Native debugger: windbg, ntsd, cdb, visual studio +// +VOID LogDebugMessage(LPCWSTR format, ...) { + LPWSTR buffer[8192]; + va_list args; + HRESULT hr; + + if (!IsDebuggerPresent()) return; + + va_start(args, format); + hr = StringCbVPrintf(buffer, sizeof(buffer), format, args); + if (SUCCEEDED(hr)) { + OutputDebugString(buffer); + } + va_end(args); +} + +//---------------------------------------------------------------------------- +// Function: SplitStringIgnoreSpaceW +// +// Description: +// splits a null-terminated string based on a delimiter +// +// Returns: +// ERROR_SUCCESS: on success +// error code: otherwise +// +// Notes: +// The tokes are also null-terminated +// Caller should use LocalFree to clear outTokens +// +DWORD SplitStringIgnoreSpaceW( + __in size_t len, + __in_bcount(len) LPCWSTR source, + __in WCHAR deli, + __out size_t* count, + __out_ecount(count) WCHAR*** outTokens) { + + size_t tokenCount = 0; + size_t crtSource; + size_t crtToken = 0; + WCHAR* lpwszTokenStart = NULL; + WCHAR* lpwszTokenEnd = NULL; + WCHAR* lpwszBuffer = NULL; + size_t tokenLength = 0; + size_t cchBufferLength = 0; + WCHAR crt; + WCHAR** tokens = NULL; + enum {BLANK, TOKEN, DELIMITER} State = BLANK; + + for(crtSource = 0; crtSource < len; ++crtSource) { + crt = source[crtSource]; + switch(State) { + case BLANK: // intentional fallthrough + case DELIMITER: + if (crt == deli) { + State = DELIMITER; + } + else if (!iswspace(crt)) { + ++tokenCount; + lpwszTokenEnd = lpwszTokenStart = source + crtSource; + State = TOKEN; + } + else { + State = BLANK; + } + break; + case TOKEN: + if (crt == deli) { + State = DELIMITER; + cchBufferLength += lpwszTokenEnd - lpwszTokenStart + 2; + } + else if (!iswspace(crt)) { + lpwszTokenEnd = source + crtSource; + } + break; + } + } + + if (State == TOKEN) { + cchBufferLength += lpwszTokenEnd - lpwszTokenStart + 2; + } + + LogDebugMessage(L"counted %d [buffer:%d] tokens in %s\n", tokenCount, cchBufferLength, source); + + #define COPY_CURRENT_TOKEN \ + tokenLength = lpwszTokenEnd - lpwszTokenStart + 1; \ + tokens[crtToken] = lpwszBuffer; \ + memcpy(tokens[crtToken], lpwszTokenStart, tokenLength*sizeof(WCHAR)); \ + tokens[crtToken][tokenLength] = L'\0'; \ + lpwszBuffer += (tokenLength+1); \ + ++crtToken; + + if (tokenCount) { + + // We use one contigous memory for both the pointer arrays and the data copy buffers + // We cannot use in-place references (zero-copy) because the function users + // need null-terminated strings for the tokens + + tokens = (WCHAR**) LocalAlloc(LPTR, + sizeof(WCHAR*) * tokenCount + // for the pointers + sizeof(WCHAR) * cchBufferLength); // for the data + + // Data will be copied after the array + lpwszBuffer = (WCHAR*)(((BYTE*)tokens) + (sizeof(WCHAR*) * tokenCount)); + + State = BLANK; + + for(crtSource = 0; crtSource < len; ++crtSource) { + crt = source[crtSource]; + switch(State) { + case DELIMITER: // intentional fallthrough + case BLANK: + if (crt == deli) { + State = DELIMITER; + } + else if (!iswspace(crt)) { + lpwszTokenEnd = lpwszTokenStart = source + crtSource; + State = TOKEN; + } + else { + State = BLANK; + } + break; + case TOKEN: + if (crt == deli) { + COPY_CURRENT_TOKEN; + State = DELIMITER; + } + else if (!iswspace(crt)) { + lpwszTokenEnd = source + crtSource; + } + break; + } + } + + // Copy out last token, if any + if (TOKEN == State) { + COPY_CURRENT_TOKEN; + } + } + + *count = tokenCount; + *outTokens = tokens; + + return ERROR_SUCCESS; +} + +//---------------------------------------------------------------------------- +// Function: BuildServiceSecurityDescriptor +// +// Description: +// Builds a security descriptor for an arbitrary object +// +// Returns: +// ERROR_SUCCESS: on success +// error code: otherwise +// +// Notes: +// The SD is a of the self-contained flavor (offsets, not pointers) +// Caller should use LocalFree to clear allocated pSD +// +DWORD BuildServiceSecurityDescriptor( + __in ACCESS_MASK accessMask, + __in size_t grantSidCount, + __in_ecount(grantSidCount) PSID* pGrantSids, + __in size_t denySidCount, + __in_ecount(denySidCount) PSID* pDenySids, + __out PSECURITY_DESCRIPTOR* pSD) { + + DWORD dwError = ERROR_SUCCESS; + int crt = 0; + int len = 0; + EXPLICIT_ACCESS* eas = NULL; + LPWSTR lpszSD = NULL; + ULONG cchSD = 0; + HANDLE hToken = INVALID_HANDLE_VALUE; + DWORD dwBufferSize = 0; + PTOKEN_USER pTokenUser = NULL; + PTOKEN_PRIMARY_GROUP pTokenGroup = NULL; + PSECURITY_DESCRIPTOR pTempSD = NULL; + ULONG cbSD = 0; + TRUSTEE owner, group; + + ZeroMemory(&owner, sizeof(owner)); + + // We'll need our own SID to add as SD owner + if (!OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY, &hToken)) { + dwError = GetLastError(); + goto done; + } + + if (!GetTokenInformation(hToken, TokenUser, NULL, 0, &dwBufferSize)) { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER != dwError) { + goto done; + } + } + + pTokenUser = (PTOKEN_USER) LocalAlloc(LPTR, dwBufferSize); + if (NULL == pTokenUser) { + dwError = GetLastError(); + goto done; + } + + if (!GetTokenInformation(hToken, TokenUser, pTokenUser, dwBufferSize, &dwBufferSize)) { + dwError = GetLastError(); + goto done; + } + + if (!IsValidSid(pTokenUser->User.Sid)) { + dwError = ERROR_INVALID_PARAMETER; + goto done; + } + + dwBufferSize = 0; + if (!GetTokenInformation(hToken, TokenPrimaryGroup, NULL, 0, &dwBufferSize)) { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER != dwError) { + goto done; + } + } + + pTokenGroup = (PTOKEN_USER) LocalAlloc(LPTR, dwBufferSize); + if (NULL == pTokenUser) { + dwError = GetLastError(); + goto done; + } + + if (!GetTokenInformation(hToken, TokenPrimaryGroup, pTokenGroup, dwBufferSize, &dwBufferSize)) { + dwError = GetLastError(); + goto done; + } + + if (!IsValidSid(pTokenGroup->PrimaryGroup)) { + dwError = ERROR_INVALID_PARAMETER; + goto done; + } + + owner.TrusteeForm = TRUSTEE_IS_SID; + owner.TrusteeType = TRUSTEE_IS_UNKNOWN; + owner.ptstrName = (LPCWSTR) pTokenUser->User.Sid; + + group.TrusteeForm = TRUSTEE_IS_SID; + group.TrusteeType = TRUSTEE_IS_UNKNOWN; + group.ptstrName = (LPCWSTR) pTokenGroup->PrimaryGroup; + + eas = (EXPLICIT_ACCESS*) alloca(sizeof(EXPLICIT_ACCESS) * (grantSidCount + denySidCount)); + + // Build the granted list + for (crt = 0; crt < grantSidCount; ++crt) { + eas[crt].grfAccessPermissions = accessMask; + eas[crt].grfAccessMode = GRANT_ACCESS; + eas[crt].grfInheritance = NO_INHERITANCE; + eas[crt].Trustee.TrusteeForm = TRUSTEE_IS_SID; + eas[crt].Trustee.TrusteeType = TRUSTEE_IS_UNKNOWN; + eas[crt].Trustee.ptstrName = (LPCWSTR) pGrantSids[crt]; + } + + // Build the deny list + for (; crt < grantSidCount + denySidCount; ++crt) { + eas[crt].grfAccessPermissions = accessMask; + eas[crt].grfAccessMode = DENY_ACCESS; + eas[crt].grfInheritance = NO_INHERITANCE; + eas[crt].Trustee.TrusteeForm = TRUSTEE_IS_SID; + eas[crt].Trustee.TrusteeType = TRUSTEE_IS_UNKNOWN; + eas[crt].Trustee.ptstrName = (LPCWSTR) pDenySids[crt - grantSidCount]; + } + + dwError = BuildSecurityDescriptor( + &owner, + &group, + crt, + eas, + 0, // cCountOfAuditEntries + NULL, // pListOfAuditEntries + NULL, // pOldSD + &cbSD, + &pTempSD); + if (ERROR_SUCCESS != dwError) { + goto done; + } + + *pSD = pTempSD; + pTempSD = NULL; + + if (IsDebuggerPresent()) { + ConvertSecurityDescriptorToStringSecurityDescriptor(*pSD, + SDDL_REVISION_1, + DACL_SECURITY_INFORMATION, + &lpszSD, + &cchSD); + LogDebugMessage(L"pSD: %.*s\n", cchSD, lpszSD); + } + +done: + if (pTokenUser) LocalFree(pTokenUser); + if (INVALID_HANDLE_VALUE != hToken) CloseHandle(hToken); + if (lpszSD) LocalFree(lpszSD); + if (pTempSD) LocalFree(pTempSD); + return dwError; +} + + +//---------------------------------------------------------------------------- +// Function: MIDL_user_allocate +// +// Description: +// Hard-coded function name used by RPC midl code for allocations +// +// Notes: +// Must match the de-allocation mechanism used in MIDL_user_free +// +void __RPC_FAR * __RPC_USER MIDL_user_allocate(size_t len) +{ + return LocalAlloc(LPTR, len); +} + + //---------------------------------------------------------------------------- + // Function: MIDL_user_free + // + // Description: + // Hard-coded function name used by RPC midl code for deallocations + // + // NoteS: + // Must match the allocation mechanism used in MIDL_user_allocate + // +void __RPC_USER MIDL_user_free(void __RPC_FAR * ptr) +{ + LocalFree(ptr); +} + diff --git hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj index fc0519d..63d936b 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj +++ hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj @@ -160,11 +160,22 @@ + + + + + + + + true + X64 + + diff --git hadoop-common-project/hadoop-common/src/main/winutils/main.c hadoop-common-project/hadoop-common/src/main/winutils/main.c index 0f40774..e43b6ac 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/main.c +++ hadoop-common-project/hadoop-common/src/main/winutils/main.c @@ -67,6 +67,10 @@ int wmain(__in int argc, __in_ecount(argc) wchar_t* argv[]) { return SystemInfo(); } + else if (wcscmp(L"service", cmd) == 0) + { + return RunService(argc - 1, argv + 1); + } else if (wcscmp(L"help", cmd) == 0) { Usage(argv[0]); @@ -119,5 +123,9 @@ The available commands and their usages are:\n\n", program); fwprintf(stdout, L"%-15s%s\n\n", L"task", L"Task operations."); TaskUsage(); + + fwprintf(stdout, L"%-15s%s\n\n", L"service", L"Service operations."); + ServiceUsage(); + fwprintf(stdout, L"\n\n"); } diff --git hadoop-common-project/hadoop-common/src/main/winutils/service.c hadoop-common-project/hadoop-common/src/main/winutils/service.c new file mode 100644 index 0000000..00f6857 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/winutils/service.c @@ -0,0 +1,883 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "winutils.h" +#include "winutils_msg.h" +#include +#include +#include +#include +#include +#include +#include "hdpwinutilsvc_h.h" + +#pragma comment(lib, "Rpcrt4.lib") +#pragma comment(lib, "advapi32.lib") +#pragma comment(lib, "authz.lib") + + +#define NM_WSCE_ALLOWED L"yarn.nodemanager.windows-secure-container-executor.allowed" +#define NM_WSCE_DENIED L"yarn.nodemanager.windows-secure-container-executor.denied" + +#define SERVICE_ACCESS_MASK 0x00000001 + +SERVICE_STATUS gSvcStatus; +SERVICE_STATUS_HANDLE gSvcStatusHandle; +HANDLE ghSvcStopEvent = INVALID_HANDLE_VALUE; +HANDLE ghWaitObject = INVALID_HANDLE_VALUE; +HANDLE ghEventLog = INVALID_HANDLE_VALUE; +BOOL isListenning = FALSE; +PSECURITY_DESCRIPTOR pAllowedSD = NULL; + +VOID SvcError(DWORD dwError); +VOID WINAPI SvcMain(DWORD dwArg, LPTSTR* lpszArgv); +DWORD SvcInit(); +DWORD RpcInit(); +DWORD AuthInit(); +VOID ReportSvcStatus( DWORD dwCurrentState, + DWORD dwWin32ExitCode, + DWORD dwWaitHint); +VOID WINAPI SvcCtrlHandler( DWORD dwCtrl ); +VOID CALLBACK SvcShutdown( + _In_ PVOID lpParameter, + _In_ BOOLEAN TimerOrWaitFired); + +#define CHECK_ERROR_DONE(status, expected, category, message) \ + if (status != expected) { \ + ReportSvcCheckError( \ + EVENTLOG_ERROR_TYPE, \ + category, \ + status, \ + message); \ + goto done; \ + } else { \ + LogDebugMessage(L"%s: OK\n", message); \ + } + + +#define CHECK_RPC_STATUS_DONE(status, message) \ + CHECK_ERROR_DONE(status, RPC_S_OK, SERVICE_CATEGORY, message) + +#define CHECK_SVC_STATUS_DONE(status, message) \ + CHECK_ERROR_DONE(status, ERROR_SUCCESS, SERVICE_CATEGORY, message) + +#define CHECK_UNWIND_RPC(rpcCall) { \ + unwindStatus = rpcCall; \ + if (RPC_S_OK != unwindStatus) { \ + ReportSvcCheckError( \ + EVENTLOG_WARNING_TYPE, \ + SERVICE_CATEGORY, \ + unwindStatus, \ + L#rpcCall); \ + } \ + } + + +//---------------------------------------------------------------------------- +// Function: ReportSvcCheckError +// +// Description: +// Reports an error with the system event log and to debugger console (if present) +// +void ReportSvcCheckError(WORD type, WORD category, DWORD dwError, LPCWSTR message) { + int len; + LPWSTR systemMsg = NULL; + LPWSTR appMsg = NULL; + DWORD dwReportError; + LPWSTR reportMsg = NULL; + WCHAR hexError[32]; + LPCWSTR inserts[] = {message, NULL, NULL, NULL}; + HRESULT hr; + + hr = StringCbPrintf(hexError, sizeof(hexError), TEXT("%x"), dwError); + if (SUCCEEDED(hr)) { + inserts[1] = hexError; + } + else { + inserts[1] = L"(Failed to format dwError as string)"; + } + + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&systemMsg, 0, NULL); + + if (len) { + inserts[2] = systemMsg; + } + else { + inserts[2] = L"(Failed to get the system error message)"; + } + + LogDebugMessage(L"%s:%d %.*s\n", message, dwError, len, systemMsg); + + if (INVALID_HANDLE_VALUE != ghEventLog) { + if (!ReportEvent(ghEventLog, type, category, MSG_CHECK_ERROR, + NULL, // lpUserSid + (WORD) 3, // wNumStrings + (DWORD) 0, // dwDataSize + inserts, // *lpStrings + NULL // lpRawData + )) { + // We tried to report and failed. Send to dbg. + dwReportError = GetLastError(); + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwReportError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&reportMsg, 0, NULL); + LogDebugMessage(L"ReportEvent: Error:%d %.*s\n", dwReportError, reportMsg); + } + }; + + if (NULL != systemMsg) LocalFree(systemMsg); + if (NULL != reportMsg) LocalFree(reportMsg); +} + + +VOID ReportSvcMessage(WORD type, WORD category, DWORD msgId) { + DWORD dwError; + + if (INVALID_HANDLE_VALUE != ghEventLog) { + if (!ReportEvent(ghEventLog, type, category, msgId, + NULL, // lpUserSid + (WORD) 0, // wNumStrings + (DWORD) 0, // dwDataSize + NULL, // *lpStrings + NULL // lpRawData + )) { + // We tried to report and failed but debugger is attached. Send to dbg. + dwError = GetLastError(); + LogDebugMessage(L"ReportEvent: error %d\n", dwError); + } + } +} + + +//---------------------------------------------------------------------------- +// Function: RunService +// +// Description: +// Registers with NT SCM and starts the service +// +// Returns: +// ERROR_SUCCESS: On success +// Error code otherwise: otherwise +DWORD RunService(__in int argc, __in_ecount(argc) wchar_t *argv[]) +{ + DWORD dwError= ERROR_SUCCESS; + int argStart = 1; + + static const SERVICE_TABLE_ENTRY serviceTable[] = { + { SVCNAME, (LPSERVICE_MAIN_FUNCTION) SvcMain }, + { NULL, NULL } + }; + + dwError = AuthInit(); + if (ERROR_SUCCESS != dwError) { + SvcError(dwError); + goto done; + } + + ghEventLog = RegisterEventSource(NULL, SVCNAME); + if (NULL == ghEventLog) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"RegisterEventSource") + } + + if (!StartServiceCtrlDispatcher(serviceTable)) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"StartServiceCtrlDispatcher") + } + +done: + return dwError; +} + +//---------------------------------------------------------------------------- +// Function: SvcMain +// +// Description: +// Service main entry point. +// +VOID WINAPI SvcMain() { + DWORD dwError = ERROR_SUCCESS; + + gSvcStatusHandle = RegisterServiceCtrlHandler( + SVCNAME, + SvcCtrlHandler); + if( !gSvcStatusHandle ) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"RegisterServiceCtrlHandler") + } + + // These SERVICE_STATUS members remain as set here + gSvcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; + gSvcStatus.dwServiceSpecificExitCode = 0; + + // Report initial status to the SCM + ReportSvcStatus( SERVICE_START_PENDING, NO_ERROR, 3000 ); + + // Perform service-specific initialization and work. + dwError = SvcInit(); + +done: + return; +} + +//---------------------------------------------------------------------------- +// Function: SvcInit +// +// Description: +// Initializes the service. +// +DWORD SvcInit() { + DWORD dwError = ERROR_SUCCESS; + + dwError = EnablePrivilege(SE_DEBUG_NAME); + if( dwError != ERROR_SUCCESS ) { + goto done; + } + + // The recommended way to shutdown the service is to use an event + // and attach a callback with RegisterWaitForSingleObject + // + ghSvcStopEvent = CreateEvent( + NULL, // default security attributes + TRUE, // manual reset event + FALSE, // not signaled + NULL); // no name + + if ( ghSvcStopEvent == NULL) + { + dwError = GetLastError(); + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + dwError, L"CreateEvent"); + ReportSvcStatus( SERVICE_STOPPED, dwError, 0 ); + goto done; + } + + if (!RegisterWaitForSingleObject (&ghWaitObject, + ghSvcStopEvent, + SvcShutdown, + NULL, + INFINITE, + WT_EXECUTEONLYONCE)) { + dwError = GetLastError(); + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + dwError, L"RegisterWaitForSingleObject"); + CloseHandle(ghSvcStopEvent); + ReportSvcStatus( SERVICE_STOPPED, dwError, 0 ); + goto done; + } + + // Report running status when initialization is complete. + ReportSvcStatus( SERVICE_RUNNING, NO_ERROR, 0 ); + + dwError = RpcInit(); + +done: + return dwError; +} + +//---------------------------------------------------------------------------- +// Function: RpcAuthorizeCallback +// +// Description: +// RPC Authorization callback. +// +// Returns: +// RPC_S_OK for access authorized +// RPC_S_ACCESS_DENIED for access denied +// +RPC_STATUS CALLBACK RpcAuthorizeCallback ( + RPC_IF_HANDLE hInterface, + void* pContext) +{ + RPC_STATUS status, + unwindStatus, + authStatus = RPC_S_ACCESS_DENIED; + DWORD dwError; + LUID luidReserved2; + AUTHZ_ACCESS_REQUEST request; + AUTHZ_ACCESS_REPLY reply; + AUTHZ_CLIENT_CONTEXT_HANDLE hClientContext = NULL; + DWORD authError = ERROR_SUCCESS; + DWORD saclResult = 0; + ACCESS_MASK grantedMask = 0; + + ZeroMemory(&luidReserved2, sizeof(luidReserved2)); + ZeroMemory(&request, sizeof(request)); + ZeroMemory(&reply, sizeof(reply)); + + status = RpcGetAuthorizationContextForClient(NULL, + FALSE, // ImpersonateOnReturn + NULL, // Reserved1 + NULL, // pExpirationTime + luidReserved2, // Reserved2 + 0, // Reserved3 + NULL, // Reserved4 + &hClientContext); + CHECK_RPC_STATUS_DONE(status, L"RpcGetAuthorizationContextForClient"); + + request.DesiredAccess = MAXIMUM_ALLOWED; + reply.Error = &authError; + reply.SaclEvaluationResults = &saclResult; + reply.ResultListLength = 1; + reply.GrantedAccessMask = &grantedMask; + + if (!AuthzAccessCheck( + 0, + hClientContext, + &request, + NULL, // AuditEvent + pAllowedSD, + NULL, // OptionalSecurityDescriptorArray + 0, // OptionalSecurityDescriptorCount + &reply, + NULL // phAccessCheckResults + )) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"AuthzAccessCheck"); + } + + LogDebugMessage(L"AutzAccessCheck: Error:%d sacl:%d access:%d\n", + authError, saclResult, grantedMask); + if (authError == ERROR_SUCCESS && (grantedMask & SERVICE_ACCESS_MASK)) { + authStatus = RPC_S_OK; + } + +done: + if (NULL != hClientContext) CHECK_UNWIND_RPC(RpcFreeAuthorizationContext(&hClientContext)); + return authStatus; +} + +//---------------------------------------------------------------------------- +// Function: AuthInit +// +// Description: +// Initializes the authorization structures (security descriptor). +// +// Notes: +// This is called from RunService solely for debugging purposed +// so that it can be tested by wimply running winutil service from CLI (no SCM) +// +DWORD AuthInit() { + DWORD dwError = ERROR_SUCCESS; + int count = 0; + int crt = 0; + int len = 0; + LPCWSTR value = NULL; + WCHAR** tokens = NULL; + LPWSTR lpszSD = NULL; + ULONG cchSD = 0; + DWORD dwBufferSize = 0; + int allowedCount = 0; + PSID* allowedSids = NULL; + + + dwError = GetConfigValue(NM_WSCE_ALLOWED, &len, &value); + CHECK_SVC_STATUS_DONE(dwError, L"GetConfigValue"); + + if (0 == len) { + CHECK_SVC_STATUS_DONE(ERROR_BAD_CONFIGURATION, NM_WSCE_ALLOWED); + } + + dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens); + CHECK_SVC_STATUS_DONE(dwError, L"SplitStringIgnoreSpaceW"); + + allowedSids = (PSID*) LocalAlloc(LPTR, sizeof(PSID) * count); + for (crt = 0; crt < count; ++crt) { + dwError = GetSidFromAcctNameW(tokens[crt], &allowedSids[crt]); + CHECK_SVC_STATUS_DONE(dwError, L"GetSidFromAcctNameW"); + } + + allowedCount = count; + + dwError = BuildServiceSecurityDescriptor(SERVICE_ACCESS_MASK, + allowedCount, allowedSids, 0, NULL, &pAllowedSD); + CHECK_SVC_STATUS_DONE(dwError, L"BuildServiceSecurityDescriptor"); + +done: + if (lpszSD) LocalFree(lpszSD); + if (value) LocalFree(value); + if (tokens) LocalFree(tokens); + return dwError; +} + +//---------------------------------------------------------------------------- +// Function: RpcInit +// +// Description: +// Initializes the RPC infrastructure and starts the RPC listenner. +// +DWORD RpcInit() { + RPC_STATUS status; + DWORD dwError; + + status = RpcServerUseProtseqIf(SVCBINDING, + RPC_C_LISTEN_MAX_CALLS_DEFAULT, + Hdpwinutilsvc_v1_0_s_ifspec, + NULL); + if (RPC_S_OK != status) { + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + status, L"RpcServerUseProtseqIf"); + SvcError(status); + dwError = status; + goto done; + } + + status = RpcServerRegisterIfEx(Hdpwinutilsvc_v1_0_s_ifspec, + NULL, // MgrTypeUuid + NULL, // MgrEpv + RPC_IF_ALLOW_LOCAL_ONLY, // Flags + RPC_C_LISTEN_MAX_CALLS_DEFAULT, // Max calls + RpcAuthorizeCallback); // Auth callback + + if (RPC_S_OK != status) { + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + status, L"RpcServerRegisterIfEx"); + SvcError(status); + dwError = status; + goto done; + } + + status = RpcServerListen(1, RPC_C_LISTEN_MAX_CALLS_DEFAULT, TRUE); + if (RPC_S_ALREADY_LISTENING == status) { + ReportSvcCheckError(EVENTLOG_WARNING_TYPE, SERVICE_CATEGORY, + status, L"RpcServerListen"); + } + else if (RPC_S_OK != status) { + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + status, L"RpcServerListen"); + SvcError(status); + dwError = status; + goto done; + } + + isListenning = TRUE; + + ReportSvcMessage(EVENTLOG_INFORMATION_TYPE, SERVICE_CATEGORY, + MSG_RPC_SERVICE_HAS_STARTED); + +done: + return dwError; +} + +//---------------------------------------------------------------------------- +// Function: RpcStop +// +// Description: +// Tears down the RPC infrastructure and stops the RPC listenner. +// +VOID RpcStop() { + RPC_STATUS status; + + if (isListenning) { + + status = RpcMgmtStopServerListening(NULL); + isListenning = FALSE; + + if (RPC_S_OK != status) { + ReportSvcCheckError(EVENTLOG_WARNING_TYPE, SERVICE_CATEGORY, + status, L"RpcMgmtStopServerListening"); + } + + ReportSvcMessage(EVENTLOG_INFORMATION_TYPE, SERVICE_CATEGORY, + MSG_RPC_SERVICE_HAS_STOPPED); + } +} + +//---------------------------------------------------------------------------- +// Function: CleanupHandles +// +// Description: +// Cleans up the global service handles. +// +VOID CleanupHandles() { + if (INVALID_HANDLE_VALUE != ghWaitObject) { + UnregisterWait(ghWaitObject); + ghWaitObject = INVALID_HANDLE_VALUE; + } + if (INVALID_HANDLE_VALUE != ghSvcStopEvent) { + CloseHandle(ghSvcStopEvent); + ghSvcStopEvent = INVALID_HANDLE_VALUE; + } + if (INVALID_HANDLE_VALUE != ghEventLog) { + DeregisterEventSource(ghEventLog); + ghEventLog = INVALID_HANDLE_VALUE; + } +} + +//---------------------------------------------------------------------------- +// Function: SvcError +// +// Description: +// Aborts the startup sequence. Reports error, stops RPC, cleans up globals. +// +VOID SvcError(DWORD dwError) { + RpcStop(); + CleanupHandles(); + ReportSvcStatus( SERVICE_STOPPED, dwError, 0 ); +} + +//---------------------------------------------------------------------------- +// Function: SvcShutdown +// +// Description: +// Callback when the shutdown event is signaled. Stops RPC, cleans up globals. +// +VOID CALLBACK SvcShutdown( + _In_ PVOID lpParameter, + _In_ BOOLEAN TimerOrWaitFired) { + RpcStop(); + CleanupHandles(); + ReportSvcStatus( SERVICE_STOPPED, NO_ERROR, 0 ); +} + +//---------------------------------------------------------------------------- +// Function: SvcCtrlHandler +// +// Description: +// Callback from SCM for for service events (signals). +// +// Notes: +// Shutdown is indirect, we set her the STOP_PENDING state and signal the stop event. +// Signaling the event invokes SvcShutdown which completes the shutdown. +// This two staged approach allows the SCM handler to complete fast, +// not blocking the SCM big fat global lock. +// +VOID WINAPI SvcCtrlHandler( DWORD dwCtrl ) +{ + // Handle the requested control code. + + switch(dwCtrl) + { + case SERVICE_CONTROL_STOP: + ReportSvcStatus(SERVICE_STOP_PENDING, NO_ERROR, 0); + + // Signal the service to stop. + SetEvent(ghSvcStopEvent); + + return; + + default: + break; + } + +} + +//---------------------------------------------------------------------------- +// Function: ReportSvcStatus +// +// Description: +// Updates the service status with the SCM. +// +VOID ReportSvcStatus( DWORD dwCurrentState, + DWORD dwWin32ExitCode, + DWORD dwWaitHint) +{ + static DWORD dwCheckPoint = 1; + DWORD dwError; + + // Fill in the SERVICE_STATUS structure. + + gSvcStatus.dwCurrentState = dwCurrentState; + gSvcStatus.dwWin32ExitCode = dwWin32ExitCode; + gSvcStatus.dwWaitHint = dwWaitHint; + + if (dwCurrentState == SERVICE_START_PENDING) + gSvcStatus.dwControlsAccepted = 0; + else gSvcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP; + + if ( (dwCurrentState == SERVICE_RUNNING) || + (dwCurrentState == SERVICE_STOPPED) ) + gSvcStatus.dwCheckPoint = 0; + else gSvcStatus.dwCheckPoint = dwCheckPoint++; + + // Report the status of the service to the SCM. + if (!SetServiceStatus( gSvcStatusHandle, &gSvcStatus)) { + dwError = GetLastError(); + ReportSvcCheckError(EVENTLOG_WARNING_TYPE, SERVICE_CATEGORY, + dwError, L"SetServiceStatus"); + }; +} + +//---------------------------------------------------------------------------- +// Function: WinutilsCreateProcessAsUser +// +// Description: +// The RPC midl declared function implementation +// +// Returns: +// ERROR_SUCCESS: On success +// Error code otherwise: otherwise +// +// Notes: +// This is the entry point when the NodeManager does the RPC call +// Note that the RPC call does not do any S4U work. Is simply spawns (suspended) wintutils +// using the right command line and the handles over the spwaned process to the NM +// The actual S4U work occurs in the spawned process, run and monitored by the NM +// +error_status_t WinutilsCreateProcessAsUser( + /* [in] */ int nmPid, + /* [in] */ CREATE_PROCESS_REQUEST *request, + /* [out] */ CREATE_PROCESS_RESPONSE **response) { + + DWORD dwError = ERROR_SUCCESS; + LPCWSTR inserts[] = {request->cwd, request->jobName, request->user, request->pidFile, request->cmdLine, NULL}; + WCHAR winutilsPath[MAX_PATH]; + WCHAR fullCmdLine[32768]; + HANDLE taskStdInRd = INVALID_HANDLE_VALUE, taskStdInWr = INVALID_HANDLE_VALUE, + taskStdOutRd = INVALID_HANDLE_VALUE, taskStdOutWr = INVALID_HANDLE_VALUE, + taskStdErrRd = INVALID_HANDLE_VALUE, taskStdErrWr = INVALID_HANDLE_VALUE, + hNmProcess = INVALID_HANDLE_VALUE, + hDuplicateProcess = INVALID_HANDLE_VALUE, + hDuplicateThread = INVALID_HANDLE_VALUE, + hDuplicateStdIn = INVALID_HANDLE_VALUE, + hDuplicateStdOut = INVALID_HANDLE_VALUE, + hDuplicateStdErr = INVALID_HANDLE_VALUE, + hSelfProcess = INVALID_HANDLE_VALUE; + BOOL fMustCleanupProcess = FALSE; + + HRESULT hr; + STARTUPINFO si; + PROCESS_INFORMATION pi; + SECURITY_ATTRIBUTES saTaskStdInOutErr; + + ZeroMemory( &si, sizeof(si) ); + si.cb = sizeof(si); + ZeroMemory( &pi, sizeof(pi) ); + pi.hProcess = INVALID_HANDLE_VALUE; + pi.hThread = INVALID_HANDLE_VALUE; + ZeroMemory( &saTaskStdInOutErr, sizeof(saTaskStdInOutErr)); + + // NB: GetCurrentProcess returns a pseudo-handle that just so happens + // has the value -1, ie. INVALID_HANDLE_VALUE. It cannot fail. + // + hSelfProcess = GetCurrentProcess(); + + hNmProcess = OpenProcess(PROCESS_DUP_HANDLE, FALSE, nmPid); + if (NULL == hNmProcess) { + dwError = GetLastError(); + goto done; + } + + GetModuleFileName(NULL, winutilsPath, sizeof(winutilsPath)/sizeof(WCHAR)); + dwError = GetLastError(); // Always check after GetModuleFileName for ERROR_INSSUFICIENT_BUFFER + if (dwError) { + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + dwError, L"GetModuleFileName"); + goto done; + } + + // NB. We can call CreateProcess("wintuls","task create ...") or we can call + // CreateProcess(NULL, "winutils task create"). Only the second form passes "task" as + // argv[1], as expected by main. First form passes "task" as argv[0] and main fails. + + hr = StringCbPrintf(fullCmdLine, sizeof(fullCmdLine), L"\"%s\" task createAsUser %ls %ls %ls %ls", + winutilsPath, + request->jobName, request->user, request->pidFile, request->cmdLine); + if (FAILED(hr)) { + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + hr, L"StringCbPrintf:fullCmdLine"); + goto done; + } + + LogDebugMessage(L"[%ls]: %ls %ls\n", request->cwd, winutilsPath, fullCmdLine); + + // stdin/stdout/stderr redirection is handled here + // We create 3 anonimous named pipes. + // Security attributes are required so that the handles can be inherited. + // We assign one end of the pipe to the process (stdin gets a read end, stdout gets a write end) + // We then duplicate the other end in the NM process, and we close our own handle + // Finally we return the duplicate handle values to the NM + // The NM will attach Java file dscriptors to the duplicated handles and + // read/write them as ordinary Java InputStream/OutputStream objects + + si.dwFlags |= STARTF_USESTDHANDLES; + + saTaskStdInOutErr.nLength = sizeof(SECURITY_ATTRIBUTES); + saTaskStdInOutErr.bInheritHandle = TRUE; + saTaskStdInOutErr.lpSecurityDescriptor = NULL; + + if (!CreatePipe(&taskStdInRd, &taskStdInWr, &saTaskStdInOutErr, 0)) { + dwError = GetLastError(); + goto done; + } + if (!SetHandleInformation(taskStdInWr, HANDLE_FLAG_INHERIT, FALSE)) { + dwError = GetLastError(); + goto done; + } + si.hStdInput = taskStdInRd; + + if (!CreatePipe(&taskStdOutRd, &taskStdOutWr, &saTaskStdInOutErr, 0)) { + dwError = GetLastError(); + goto done; + } + if (!SetHandleInformation(taskStdOutRd, HANDLE_FLAG_INHERIT, FALSE)) { + dwError = GetLastError(); + goto done; + } + si.hStdOutput = taskStdOutWr; + + if (!CreatePipe(&taskStdErrRd, &taskStdErrWr, &saTaskStdInOutErr, 0)) { + dwError = GetLastError(); + goto done; + } + if (!SetHandleInformation(taskStdErrRd, HANDLE_FLAG_INHERIT, FALSE)) { + dwError = GetLastError(); + goto done; + } + si.hStdError = taskStdErrWr; + + if (!CreateProcess( + NULL, // lpApplicationName, + fullCmdLine, // lpCommandLine, + NULL, // lpProcessAttributes, + NULL, // lpThreadAttributes, + TRUE, // bInheritHandles, + CREATE_SUSPENDED, // dwCreationFlags, + NULL, // lpEnvironment, + request->cwd, // lpCurrentDirectory, + &si, // lpStartupInfo + &pi)) { // lpProcessInformation + + dwError = GetLastError(); + ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + dwError, L"CreateProcess"); + goto done; + } + + fMustCleanupProcess = TRUE; + + LogDebugMessage(L"CreateProcess: pid:%x\n", pi.dwProcessId); + + if (!DuplicateHandle(hSelfProcess, pi.hProcess, hNmProcess, + &hDuplicateProcess, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: pi.hProcess\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, pi.hThread, hNmProcess, + &hDuplicateThread, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: pi.hThread\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, taskStdInWr, hNmProcess, + &hDuplicateStdIn, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: taskStdInWr\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, taskStdOutRd, hNmProcess, + &hDuplicateStdOut, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: taskStdOutRd\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, taskStdErrRd, hNmProcess, + &hDuplicateStdErr, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: taskStdErrRd\n"); + goto done; + } + + *response = (CREATE_PROCESS_RESPONSE*) MIDL_user_allocate(sizeof(CREATE_PROCESS_RESPONSE)); + if (NULL == *response) { + dwError = ERROR_OUTOFMEMORY; + LogDebugMessage(L"Failed to allocate CREATE_PROCESS_RESPONSE* response\n"); + goto done; + } + + // We're now transfering ownership of the duplicated handles to the caller + // If the RPC call fails *after* this point the handles are leaked inside the NM process + + (*response)->hProcess = hDuplicateProcess; + (*response)->hThread = hDuplicateThread; + (*response)->hStdIn = hDuplicateStdIn; + (*response)->hStdOut = hDuplicateStdOut; + (*response)->hStdErr = hDuplicateStdErr; + + fMustCleanupProcess = FALSE; + +done: + + if (fMustCleanupProcess) { + LogDebugMessage(L"Cleaning process: %d due to error:%d\n", pi.dwProcessId, dwError); + TerminateProcess(pi.hProcess, EXIT_FAILURE); + + // cleanup the duplicate handles inside the NM. + + if (INVALID_HANDLE_VALUE != hDuplicateProcess) { + DuplicateHandle(hNmProcess, hDuplicateProcess, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateThread) { + DuplicateHandle(hNmProcess, hDuplicateThread, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateStdIn) { + DuplicateHandle(hNmProcess, hDuplicateStdIn, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateStdOut) { + DuplicateHandle(hNmProcess, hDuplicateStdOut, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateStdErr) { + DuplicateHandle(hNmProcess, hDuplicateStdErr, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + } + + if (INVALID_HANDLE_VALUE != hSelfProcess) CloseHandle(hSelfProcess); + if (INVALID_HANDLE_VALUE != hNmProcess) CloseHandle(hNmProcess); + if (INVALID_HANDLE_VALUE != taskStdInRd) CloseHandle(taskStdInRd); + if (INVALID_HANDLE_VALUE != taskStdInWr) CloseHandle(taskStdInWr); + if (INVALID_HANDLE_VALUE != taskStdOutRd) CloseHandle(taskStdOutRd); + if (INVALID_HANDLE_VALUE != taskStdOutWr) CloseHandle(taskStdOutWr); + if (INVALID_HANDLE_VALUE != taskStdErrRd) CloseHandle(taskStdErrRd); + if (INVALID_HANDLE_VALUE != taskStdErrWr) CloseHandle(taskStdErrWr); + + + // This is closing our own process/thread handles. + // If the transfer was succesfull the NM has its own duplicates (if any) + if (INVALID_HANDLE_VALUE != pi.hThread) CloseHandle(pi.hThread); + if (INVALID_HANDLE_VALUE != pi.hProcess) CloseHandle(pi.hProcess); + + return dwError; +} + +//---------------------------------------------------------------------------- +// Function: ServiceUsage +// +// Description: +// Prints the CLI arguments for service command. +// +void ServiceUsage() +{ + fwprintf(stdout, L"\ + Usage: service\n\ + Starts the impersonation helper service.\n\ + This should be called from the SCM.\n\ + The impersonation helper service must run as a high privileged account (LocalSystem)\n\ + and is used by the NodeManager to spawn secure containers.\n"); +} + + diff --git hadoop-common-project/hadoop-common/src/main/winutils/task.c hadoop-common-project/hadoop-common/src/main/winutils/task.c index 520184b..1e2374d 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/task.c +++ hadoop-common-project/hadoop-common/src/main/winutils/task.c @@ -19,10 +19,18 @@ #include #include #include +#include #define PSAPI_VERSION 1 #pragma comment(lib, "psapi.lib") + +#define NM_WSCE_IMPERSONATE_ALLOWED L"yarn.nodemanager.windows-secure-container-executor.impersonate.allowed" +#define NM_WSCE_IMPERSONATE_DENIED L"yarn.nodemanager.windows-secure-container-executor.impersonate.denied" + +// The S4U impersonation access check mask. Arbitrary value (we use 1 for the service access check) +#define SERVICE_IMPERSONATE_MASK 0x00000002 + #define ERROR_TASK_NOT_ALIVE 1 // This exit code for killed processes is compatible with Unix, where a killed @@ -104,6 +112,221 @@ static BOOL ParseCommandLine(__in int argc, return FALSE; } + +//---------------------------------------------------------------------------- +// Function: BuildImpersonateSecurityDescriptor +// +// Description: +// Builds the security descriptor for the S4U impersonation permissions +// This describes what users can be impersonated and what not +// +// Returns: +// ERROR_SUCCESS: On success +// GetLastError: otherwise +// +DWORD BuildImpersonateSecurityDescriptor(__out PSECURITY_DESCRIPTOR* ppSD) { + DWORD dwError = ERROR_SUCCESS; + size_t countAllowed = 0; + PSID* allowedSids = NULL; + size_t countDenied = 0; + PSID* deniedSids = NULL; + LPCWSTR value = NULL; + WCHAR** tokens = NULL; + size_t len = 0; + size_t count = 0; + int crt = 0; + PSECURITY_DESCRIPTOR pSD = NULL; + + dwError = GetConfigValue(NM_WSCE_IMPERSONATE_ALLOWED, &len, &value); + if (dwError) { + ReportErrorCode(L"GetConfigValue:1", dwError); + goto done; + } + + if (0 == len) { + dwError = ERROR_BAD_CONFIGURATION; + ReportErrorCode(L"GetConfigValue:2", dwError); + goto done; + } + + dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens); + if (dwError) { + ReportErrorCode(L"SplitStringIgnoreSpaceW:1", dwError); + goto done; + } + + allowedSids = LocalAlloc(LPTR, sizeof(PSID) * count); + if (NULL == allowedSids) { + dwError = GetLastError(); + ReportErrorCode(L"LocalAlloc:1", dwError); + goto done; + } + + for(crt = 0; crt < count; ++crt) { + dwError = GetSidFromAcctNameW(tokens[crt], &allowedSids[crt]); + if (dwError) { + ReportErrorCode(L"GetSidFromAcctNameW:1", dwError); + goto done; + } + } + countAllowed = count; + + LocalFree(tokens); + tokens = NULL; + + LocalFree(value); + value = NULL; + + dwError = GetConfigValue(NM_WSCE_IMPERSONATE_DENIED, &len, &value); + if (dwError) { + ReportErrorCode(L"GetConfigValue:3", dwError); + goto done; + } + + if (0 != len) { + dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens); + if (dwError) { + ReportErrorCode(L"SplitStringIgnoreSpaceW:2", dwError); + goto done; + } + + deniedSids = LocalAlloc(LPTR, sizeof(PSID) * count); + if (NULL == allowedSids) { + dwError = GetLastError(); + ReportErrorCode(L"LocalAlloc:2", dwError); + goto done; + } + + for(crt = 0; crt < count; ++crt) { + dwError = GetSidFromAcctNameW(tokens[crt], &deniedSids[crt]); + if (dwError) { + ReportErrorCode(L"GetSidFromAcctNameW:2", dwError); + goto done; + } + } + countDenied = count; + } + + dwError = BuildServiceSecurityDescriptor( + SERVICE_IMPERSONATE_MASK, + countAllowed, allowedSids, + countDenied, deniedSids, + &pSD); + + if (dwError) { + ReportErrorCode(L"BuildServiceSecurityDescriptor", dwError); + goto done; + } + + *ppSD = pSD; + pSD = NULL; + +done: + if (pSD) LocalFree(pSD); + if (tokens) LocalFree(tokens); + if (allowedSids) LocalFree(allowedSids); + if (deniedSids) LocalFree(deniedSids); + return dwError; +} + +//---------------------------------------------------------------------------- +// Function: ValidateImpersonateAccessCheck +// +// Description: +// Performs the access check for S4U impersonation +// +// Returns: +// ERROR_SUCCESS: On success +// ERROR_ACCESS_DENIED, GetLastError: otherwise +// +DWORD ValidateImpersonateAccessCheck(__in HANDLE logonHandle) { + DWORD dwError = ERROR_SUCCESS; + PSECURITY_DESCRIPTOR pSD = NULL; + LUID luidUnused; + AUTHZ_ACCESS_REQUEST request; + AUTHZ_ACCESS_REPLY reply; + DWORD authError = ERROR_SUCCESS; + DWORD saclResult = 0; + ACCESS_MASK grantedMask = 0; + AUTHZ_RESOURCE_MANAGER_HANDLE hManager = NULL; + AUTHZ_CLIENT_CONTEXT_HANDLE hAuthzToken = NULL; + + ZeroMemory(&luidUnused, sizeof(luidUnused)); + ZeroMemory(&request, sizeof(request)); + ZeroMemory(&reply, sizeof(reply)); + + dwError = BuildImpersonateSecurityDescriptor(&pSD); + if (dwError) { + ReportErrorCode(L"BuildImpersonateSecurityDescriptor", dwError); + goto done; + } + + request.DesiredAccess = MAXIMUM_ALLOWED; + reply.Error = &authError; + reply.SaclEvaluationResults = &saclResult; + reply.ResultListLength = 1; + reply.GrantedAccessMask = &grantedMask; + + if (!AuthzInitializeResourceManager( + AUTHZ_RM_FLAG_NO_AUDIT, + NULL, // pfnAccessCheck + NULL, // pfnComputeDynamicGroups + NULL, // pfnFreeDynamicGroups + NULL, // szResourceManagerName + &hManager)) { + dwError = GetLastError(); + ReportErrorCode(L"AuthzInitializeResourceManager", dwError); + goto done; + } + + if (!AuthzInitializeContextFromToken( + 0, + logonHandle, + hManager, + NULL, // expiration time + luidUnused, // not used + NULL, // callback args + &hAuthzToken)) { + dwError = GetLastError(); + ReportErrorCode(L"AuthzInitializeContextFromToken", dwError); + goto done; + } + + if (!AuthzAccessCheck( + 0, + hAuthzToken, + &request, + NULL, // AuditEvent + pSD, + NULL, // OptionalSecurityDescriptorArray + 0, // OptionalSecurityDescriptorCount + &reply, + NULL // phAccessCheckResults + )) { + dwError = GetLastError(); + ReportErrorCode(L"AuthzAccessCheck", dwError); + goto done; + } + + LogDebugMessage(L"AutzAccessCheck: Error:%d sacl:%d access:%d\n", + authError, saclResult, grantedMask); + + if (authError != ERROR_SUCCESS) { + ReportErrorCode(L"AuthzAccessCheck:REPLY:1", authError); + dwError = authError; + } + else if (!(grantedMask & SERVICE_IMPERSONATE_MASK)) { + ReportErrorCode(L"AuthzAccessCheck:REPLY:2", ERROR_ACCESS_DENIED); + dwError = ERROR_ACCESS_DENIED; + } + +done: + if (hAuthzToken) AuthzFreeContext(hAuthzToken); + if (hManager) AuthzFreeResourceManager(hManager); + if (pSD) LocalFree(pSD); + return dwError; +} + //---------------------------------------------------------------------------- // Function: CreateTaskImpl // @@ -131,6 +354,13 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PW wchar_t* curr_dir = NULL; FILE *stream = NULL; + if (NULL != logonHandle) { + dwErrorCode = ValidateImpersonateAccessCheck(logonHandle); + if (dwErrorCode) { + return dwErrorCode; + } + } + // Create un-inheritable job object handle and set job object to terminate // when last handle is closed. So winutils.exe invocation has the only open // job object handle. Exit of winutils.exe ensures termination of job object. @@ -305,7 +535,8 @@ DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD CreateTaskAsUser(__in PCWSTR jobObjName,__in PWSTR user, __in PWSTR pidFilePath, __in PWSTR cmdLine) +DWORD CreateTaskAsUser(__in PCWSTR jobObjName, + __in PCWSTR user, __in PCWSTR pidFilePath, __in PCWSTR cmdLine) { DWORD err = ERROR_SUCCESS; DWORD exitCode = EXIT_FAILURE; diff --git hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc new file mode 100644 index 0000000..a2e30ad --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc @@ -0,0 +1,46 @@ +; // winutils.mc + +; // EventLog messages for Hadoop winutils service. + + +LanguageNames=(English=0x409:MSG00409) + + +; // The following are the categories of events. + +MessageIdTypedef=WORD + +MessageId=0x1 +SymbolicName=SERVICE_CATEGORY +Language=English +Service Events +. + +MessageId=0x2 +SymbolicName=LOG_CATEGORY +Language=English +Task Events +. + +; // The following are the message definitions. + +MessageIdTypedef=DWORD + +MessageId=0x80 +SymbolicName=MSG_CHECK_ERROR +Language=English +%1. Error %2: %3. +. + +MessageId=0x100 +SymbolicName=MSG_RPC_SERVICE_HAS_STARTED +Language=English +The LPC server is listenning. +. + +MessageId=0x200 +SymbolicName=MSG_RPC_SERVICE_HAS_STOPPED +Language=English +The LPC server has stopped listenning. +. + diff --git hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj index 5b9a195..f216b71 100644 --- hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj +++ hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj @@ -1,5 +1,4 @@  - - @@ -86,11 +84,6 @@ true - - true - - ..\..\..\target\winutils\$(Configuration)\ - false @@ -124,6 +117,10 @@ Console true + + X64 + ..\..\..\target\winutils\$(Configuration)\ + @@ -159,7 +156,40 @@ true + + + $(IntermediateOutputPath) + + + Compiling Messages + mc.exe $(TargetName).mc -z $(TargetName)_msg -r $(IntermediateOutputPath) -h $(IntermediateOutputPath) -U + $(IntermediateOutputPath)\$(TargetName)_msg.rc,$(IntermediateOutputPath)\$(TargetName)_msg.h + + + true + X64 + $(IntermediateOutputPath) + true + true + true + 2 + + + + Midl + ClCompile,ResourceCompile + + + + + + + + + + + @@ -179,4 +209,4 @@ - \ No newline at end of file + diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java index 2f8b84d..1e2d16e 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java @@ -296,7 +296,7 @@ public static boolean isAlive(String pid) { return false; } catch (IOException ioe) { LOG.warn("Error executing shell command " - + Arrays.toString(shexec.getExecString()) + ioe); + + shexec.toString() + ioe); return false; } return (shexec.getExitCode() == 0 ? true : false); @@ -321,7 +321,7 @@ public static boolean isProcessGroupAlive(String pgrpId) { return false; } catch (IOException ioe) { LOG.warn("Error executing shell command " - + Arrays.toString(shexec.getExecString()) + ioe); + + shexec.toString() + ioe); return false; } return (shexec.getExitCode() == 0 ? true : false); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 35b61b8..44adf55 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -868,7 +868,7 @@ */ public static final String NM_WINDOWS_SECURE_CONTAINER_GROUP = NM_PREFIX + "windows-secure-container-executor.group"; - + /** T-file compression types used to compress aggregated logs.*/ public static final String NM_LOG_AGG_COMPRESSION_TYPE = NM_PREFIX + "log-aggregation.compression-type"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 5ec9c4c..991bb8d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -31,9 +31,11 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -41,6 +43,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ExitCodeException; +import org.apache.hadoop.util.Shell.ICommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -183,20 +186,16 @@ public int launchContainer(Container container, // create log dir under app // fork script - ShellCommandExecutor shExec = null; + Shell.ICommandExecutor shExec = null; try { setScriptExecutable(launchDst, userName); setScriptExecutable(sb.getWrapperScriptPath(), userName); - // Setup command to run - String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), - containerIdStr, userName, pidFile, this.getConf()); - - LOG.info("launchContainer: " + Arrays.toString(command)); - shExec = new ShellCommandExecutor( - command, + shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(), + containerIdStr, userName, pidFile, this.getConf(), new File(containerWorkDir.toUri().getPath()), - container.getLaunchContext().getEnvironment()); // sanitized env + container.getLaunchContext().getEnvironment()); + if (isContainerActive(containerId)) { shExec.execute(); } @@ -241,11 +240,25 @@ public int launchContainer(Container container, } return exitCode; } finally { - ; // + shExec.dispose(); // } return 0; } + protected ICommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr, + String userName, Path pidFile, Configuration conf, File wordDir, Map environment) + throws IOException { + + String[] command = getRunCommand(wrapperScriptPath, + containerIdStr, userName, pidFile, this.getConf()); + + LOG.info("launchContainer: " + Arrays.toString(command)); + return new ShellCommandExecutor( + command, + wordDir, + environment); + } + protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( String containerIdStr, Path containerWorkDir) { return Shell.WINDOWS ? diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java index 30beaf8..572d703 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java @@ -17,25 +17,34 @@ */ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.PrintStream; +import java.io.Reader; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.WinutilsProcessStub; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ICommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.LocalWrapperScriptBuilder; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; /** @@ -43,7 +52,7 @@ * */ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor { - + private static final Log LOG = LogFactory .getLog(WindowsSecureContainerExecutor.class); @@ -59,6 +68,126 @@ protected void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout.format("@call \"%s\"", launchDst); } } + + private static class WintuilsProcessStubExecutor implements Shell.ICommandExecutor { + private WinutilsProcessStub processStub; + private StringBuilder output = new StringBuilder(); + private int exitCode; + + private enum State { + INIT, + RUNNING, + COMPLETE + }; + + private State state;; + + private final String cwd; + private final String jobName; + private final String userName; + private final String pidFile; + private final String cmdLine; + private final Configuration conf; + + public WintuilsProcessStubExecutor( + Configuration conf, + String cwd, + String jobName, + String userName, + String pidFile, + String cmdLine) { + this.conf = conf; + this.cwd = cwd; + this.jobName = jobName; + this.userName = userName; + this.pidFile = pidFile; + this.cmdLine = cmdLine; + this.state = State.INIT; + } + + private void assumeComplete() throws IOException { + if (state != State.COMPLETE) { + throw new IOException("Process is not complete"); + } + } + + public String getOutput () throws IOException { + assumeComplete(); + return output.toString(); + } + + public int getExitCode() throws IOException { + assumeComplete(); + return exitCode; + } + + public void validateResult() throws IOException { + assumeComplete(); + if (0 != exitCode) { + LOG.warn(output.toString()); + throw new IOException("Processs exit code is:" + exitCode); + } + } + + private Thread startStreamReader(final InputStream stream) throws IOException { + Thread streamReaderThread = new Thread() { + + @Override + public void run() { + try + { + BufferedReader rdr = new BufferedReader( + new InputStreamReader(stream)); + + String line = rdr.readLine(); + while((line != null) && !isInterrupted()) { + synchronized(output) { + output.append(line); + output.append(System.getProperty("line.separator")); + } + line = rdr.readLine(); + } + } + catch(Throwable t) { + LOG.error("Error occured reading the process stdout", t); + } + } + }; + streamReaderThread.start(); + return streamReaderThread; + } + + public void execute() throws IOException { + if (state != State.INIT) { + throw new IOException("Process is already started"); + } + processStub = NativeIO.createTaskAsUser(cwd, jobName, userName, pidFile, cmdLine); + state = State.RUNNING; + + Thread stdOutReader = startStreamReader(processStub.getInputStream()); + Thread stdErrReader = startStreamReader(processStub.getErrorStream()); + + try { + processStub.resume(); + processStub.waitFor(); + stdOutReader.join(); + stdErrReader.join(); + } + catch(InterruptedException ie) { + throw new IOException(ie); + } + + exitCode = processStub.exitValue(); + state = State.COMPLETE; + } + + @Override + public void dispose() { + if (processStub != null) { + processStub.dispose(); + } + } + } private String nodeManagerGroup; @@ -125,21 +254,12 @@ public void startLocalizer(Path nmPrivateContainerTokens, copyFile(nmPrivateContainerTokens, tokenDst, user); List command ; - String[] commandArray; - ShellCommandExecutor shExec; File cwdApp = new File(appStorageDir.toString()); LOG.info(String.format("cwdApp: %s", cwdApp)); command = new ArrayList(); - command.add(Shell.WINUTILS); - command.add("task"); - command.add("createAsUser"); - command.add("START_LOCALIZER_" + locId); - command.add(user); - command.add("nul:"); // PID file - //use same jvm as parent File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java.exe"); command.add(jvm.toString()); @@ -160,12 +280,31 @@ public void startLocalizer(Path nmPrivateContainerTokens, } ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr, localDirs); - commandArray = command.toArray(new String[command.size()]); - - shExec = new ShellCommandExecutor( - commandArray, cwdApp); - - shExec.execute(); + + String cmdLine = StringUtils.join(command, " "); + + WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor( + getConf(), + cwdApp.getAbsolutePath(), + "START_LOCALIZER_" + locId, user, "nul:", cmdLine); + try { + stubExecutor.execute(); + stubExecutor.validateResult(); + } + finally { + stubExecutor.dispose(); + } } + + @Override + protected ICommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr, + String userName, Path pidFile, Configuration conf, File wordDir, Map environment) + throws IOException { + + return new WintuilsProcessStubExecutor( + getConf(), + wordDir.toString(), + containerIdStr, userName, pidFile.toString(), "cmd /c " + wrapperScriptPath); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 3525170..762565b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -369,10 +369,13 @@ public static void main(String[] argv) throws Throwable { new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, appId, locId, localDirs, RecordFactoryProvider.getRecordFactory(null)); - System.exit(localizer.runLocalization(nmAddr)); + int nRet = localizer.runLocalization(nmAddr); + LOG.info(String.format("nRet: %d", nRet)); + System.exit(nRet); } catch (Throwable e) { // Print error to stdout so that LCE can use it. e.printStackTrace(System.out); + LOG.error("Exception in main:", e); throw e; } }