diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index db8da35..1e220b4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.io.nativeio; +import java.io.BufferedInputStream; import java.io.File; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.RandomAccessFile; import java.lang.reflect.Field; import java.nio.ByteBuffer; @@ -839,4 +842,71 @@ public static void renameTo(File src, File dst) */ private static native void renameTo0(String src, String dst) throws NativeIOException; + + /** + * Wraps a process started by the winutils service helper. + * + */ + public static class WinutilsProcessStub extends Process { + + private final long hProcess; + private final long hThread; + private boolean disposed = false; + + private final InputStream stdErr; + private final InputStream stdOut; + private final OutputStream stdIn; + + public WinutilsProcessStub(long hProcess, long hThread, long hStdIn, long hStdOut, long hStdErr) { + this.hProcess = hProcess; + this.hThread = hThread; + + this.stdIn = new FileOutputStream(getFileDescriptorFromHandle(hStdIn)); + this.stdOut = new FileInputStream(getFileDescriptorFromHandle(hStdOut)); + this.stdErr = new FileInputStream(getFileDescriptorFromHandle(hStdErr)); + } + + private static native FileDescriptor getFileDescriptorFromHandle(long handle); + + @Override + public native void destroy(); + + @Override + public native int exitValue(); + + @Override + public InputStream getErrorStream() { + return stdErr; + } + @Override + public InputStream getInputStream() { + return stdOut; + } + @Override + public OutputStream getOutputStream() { + return stdIn; + } + @Override + public native int waitFor() throws InterruptedException; + + public synchronized native void dispose(); + + public native void resume(); + } + + public synchronized static WinutilsProcessStub createTaskAsUser(String cwd, String jobName, String user, String pidFile, String cmdLine) + throws IOException { + if (!nativeLoaded) { + throw new IOException("NativeIO libraries are required for createTaskAsUser"); + } + synchronized(Shell.WindowsProcessLaunchLock) { + return createTaskAsUser0(cwd, jobName, user, pidFile, cmdLine); + } + } + + private static native WinutilsProcessStub createTaskAsUser0( + String cwd, + String jobName, String user, String pidFile, String cmdLine) + throws NativeIOException; + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java index fcdc021..67297cd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java @@ -643,6 +643,18 @@ public String toString() { } } + public interface ICommandExecutor { + + void execute() throws IOException; + + int getExitCode() throws IOException; + + String getOutput() throws IOException; + + void dispose(); + + } + /** * A simple shell command executor. * @@ -651,7 +663,7 @@ public String toString() { * directory and the environment remains unchanged. The output of the command * is stored as-is and is expected to be small. */ - public static class ShellCommandExecutor extends Shell { + public static class ShellCommandExecutor extends Shell implements ICommandExecutor { private String[] command; private StringBuffer output; @@ -743,6 +755,10 @@ public String toString() { } return builder.toString(); } + + @Override + public void dispose() { + } } /** diff --git a/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj b/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj index 0d67e1e..e743788 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj +++ b/hadoop-common-project/hadoop-common/src/main/native/native.vcxproj @@ -99,6 +99,7 @@ + diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index 79c9b9d..d8c6005 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -49,6 +49,7 @@ #include "file_descriptor.h" #include "errno_enum.h" +#include "winutils_process_stub.h" #define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ #define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE @@ -191,6 +192,12 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative( errno_enum_init(env); PASS_EXCEPTIONS_GOTO(env, error); #endif + +#ifdef WINDOWS + winutils_process_stub_init(env,clazz); + PASS_EXCEPTIONS_GOTO(env, error); +#endif + return; error: // these are all idempodent and safe to call even if the @@ -203,6 +210,9 @@ error: #ifdef UNIX errno_enum_deinit(env); #endif +#ifdef WINDOWS + winutils_process_stub_deinit(env); +#endif } /* @@ -1008,6 +1018,7 @@ cleanup: #endif } + JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_renameTo0(JNIEnv *env, jclass clazz, jstring jsrc, jstring jdst) @@ -1063,6 +1074,83 @@ JNIEnv *env, jclass clazz) #endif } + +/* + * Class: org_apache_hadoop_io_nativeio_NativeIO_Windows + * Method: createTaskAsUser + * Signature: (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String)Lorg/apache/hadoop/io/nativeio/NativeIO$WinutilsProcessStub + */ +JNIEXPORT jobject JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_createTaskAsUser0(JNIEnv* env, + jclass clazz, jstring cwd, jstring jobName, jstring user, jstring pidFile, jstring cmdLine) { +#ifdef UNIX + THROW(env, "java/io/IOException", + "The function createTaskAsUser is not supported on Unix"); + return -1; +#endif + +#ifdef WINDOWS + LPCWSTR lpszCwd = NULL, lpszJobName = NULL, lpszUser = NULL, lpszPidFile = NULL, lpszCmdLine = NULL; + DWORD dwError = ERROR_SUCCESS; + HANDLE hProcess = INVALID_HANDLE_VALUE, + hThread = INVALID_HANDLE_VALUE, + hStdIn = INVALID_HANDLE_VALUE, + hStdOut = INVALID_HANDLE_VALUE, + hStdErr = INVALID_HANDLE_VALUE; + jobject ret = NULL; + + lpszCwd = (LPCWSTR) (*env)->GetStringChars(env, cwd, NULL); + if (!lpszCwd) goto done; // exception was thrown + + lpszJobName = (LPCWSTR) (*env)->GetStringChars(env, jobName, NULL); + if (!lpszJobName) goto done; // exception was thrown + + lpszUser = (LPCWSTR) (*env)->GetStringChars(env, user, NULL); + if (!lpszUser) goto done; // exception was thrown + + lpszPidFile = (LPCWSTR) (*env)->GetStringChars(env, pidFile, NULL); + if (!lpszPidFile) goto done; // exception was thrown + + lpszCmdLine = (LPCWSTR) (*env)->GetStringChars(env, cmdLine, NULL); + if (!lpszCmdLine) goto done; // exception was thrown + + dwError = RpcCall_TaskCreateAsUser(lpszCwd, lpszJobName, lpszUser, lpszPidFile, lpszCmdLine, + &hProcess, &hThread, &hStdIn, &hStdOut, &hStdErr); + + if (ERROR_SUCCESS == dwError) { + ret = winutils_process_stub_create(env, (jlong) hProcess, (jlong) hThread, + (jlong) hStdIn, (jlong) hStdOut, (jlong) hStdErr); + + if (NULL == ret) { + TerminateProcess(hProcess, EXIT_FAILURE); + CloseHandle(hThread); + CloseHandle(hProcess); + CloseHandle(hStdIn); + CloseHandle(hStdOut); + CloseHandle(hStdErr); + } + } + +done: + + if (lpszCwd) (*env)->ReleaseStringChars(env, cwd, lpszCwd); + if (lpszJobName) (*env)->ReleaseStringChars(env, jobName, lpszJobName); + if (lpszUser) (*env)->ReleaseStringChars(env, user, lpszUser); + if (lpszPidFile) (*env)->ReleaseStringChars(env, pidFile, lpszPidFile); + if (lpszCmdLine) (*env)->ReleaseStringChars(env, cmdLine, lpszCmdLine); + + if (dwError != ERROR_SUCCESS) { + throw_ioe (env, dwError); + } + + return ret; + +#endif +} + + + + /** * vim: sw=2: ts=2: et: */ diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c new file mode 100644 index 0000000..a96660f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c @@ -0,0 +1,183 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "jni.h" +#include "org_apache_hadoop.h" +#include "winutils_process_stub.h" +#include "winutils.h" +#include "file_descriptor.h" + +// class of org.apache.hadoop.io.nativeio.NativeIO.WinutilsProcessStub +static jclass wps_class = NULL; + + +static jmethodID wps_constructor = NULL; +static jfieldID wps_hProcess = NULL; +static jfieldID wps_hThread = NULL; +static jfieldID wps_disposed = NULL; + +void winutils_process_stub_init(JNIEnv *env) { + if (wps_class != NULL) return; // already initted + + wps_class = (*env)->FindClass(env, WINUTILS_PROCESS_STUB_CLASS); + PASS_EXCEPTIONS(env); + wps_class = (*env)->NewGlobalRef(env, wps_class); + + wps_hProcess = (*env)->GetFieldID(env, wps_class, "hProcess", "J"); + PASS_EXCEPTIONS(env); + + wps_hThread = (*env)->GetFieldID(env, wps_class, "hThread", "J"); + PASS_EXCEPTIONS(env); + + wps_disposed = (*env)->GetFieldID(env, wps_class, "disposed", "Z"); + PASS_EXCEPTIONS(env); + + wps_constructor = (*env)->GetMethodID(env, wps_class, "", "(JJJJJ)V"); + + LogDebugMessage(L"winutils_process_stub_init\n"); +} + +void winutils_process_stub_deinit(JNIEnv *env) { + if (wps_class != NULL) { + (*env)->DeleteGlobalRef(env, wps_class); + wps_class = NULL; + } + wps_hProcess = NULL; + wps_hThread = NULL; + wps_disposed = NULL; + wps_constructor = NULL; + LogDebugMessage(L"winutils_process_stub_deinit\n"); +} + +jobject winutils_process_stub_create(JNIEnv *env, + jlong hProcess, jlong hThread, jlong hStdIn, jlong hStdOut, jlong hStdErr) { + jobject obj = (*env)->NewObject(env, wps_class, wps_constructor, + hProcess, hThread, hStdIn, hStdOut, hStdErr); + PASS_EXCEPTIONS_RET(env, NULL); + + LogDebugMessage(L"winutils_process_stub_create: %p\n", obj); + + return obj; +} + + +/* + * native void destroy(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_destroy( + JNIEnv *env, jobject objSelf) { + + HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + LogDebugMessage(L"TerminateProcess: %x\n", hProcess); + TerminateProcess(hProcess, EXIT_FAILURE); +} + +/* + * native void waitFor(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_waitFor( + JNIEnv *env, jobject objSelf) { + + HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + LogDebugMessage(L"WaitForSingleObject: %x\n", hProcess); + WaitForSingleObject(hProcess, INFINITE); +} + + + +/* + * native void resume(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_resume( + JNIEnv *env, jobject objSelf) { + + HANDLE hThread = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hThread); + LogDebugMessage(L"ResumeThread: %x\n", hThread); + ResumeThread(hThread, INFINITE); +} + +/* + * native int exitValue(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT jint JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_exitValue( + JNIEnv *env, jobject objSelf) { + + DWORD exitCode; + HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + GetExitCodeProcess(hProcess, &exitCode); + LogDebugMessage(L"GetExitCodeProcess: %x :%d\n", hProcess, exitCode); + + return exitCode; +} + + +/* + * native void dispose(); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_dispose( + JNIEnv *env, jobject objSelf) { + + HANDLE hProcess, hThread, hStdIn, hStdOut, hStdErr; + + jboolean disposed = (*env)->GetBooleanField(env, objSelf, wps_disposed); + + if (JNI_TRUE != disposed) { + hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess); + hThread = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hThread); + + CloseHandle(hProcess); + CloseHandle(hThread); + (*env)->SetBooleanField(env, objSelf, wps_disposed, JNI_TRUE); + LogDebugMessage(L"disposed: %p\n", objSelf); + } +} + + +/* + * native static FileDescriptor getFileDescriptorFromHandle(long handle); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT jobject JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_getFileDescriptorFromHandle( + JNIEnv *env, jclass klass, jlong handle) { + + LogDebugMessage(L"getFileDescriptorFromHandle: %x\n", handle); + return fd_create(env, (long) handle); +} + diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h new file mode 100644 index 0000000..6ab8ad6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + + +#define WINUTILS_PROCESS_STUB_CLASS "org/apache/hadoop/io/nativeio/NativeIO$WinutilsProcessStub" + +void winutils_process_stub_init(JNIEnv *env); +void winutils_process_stub_deinit(JNIEnv *env); +jobject winutils_process_stub_create(JNIEnv *env, + jlong hProcess, jlong hThread, jlong hStdIn, jlong hStdOut, jlong hStdErr); + + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/client.c b/hadoop-common-project/hadoop-common/src/main/winutils/client.c new file mode 100644 index 0000000..1c2dc09 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/winutils/client.c @@ -0,0 +1,158 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "winutils.h" +#include +#include +#include "hdpwinutilsvc_h.h" + +#pragma comment(lib, "Rpcrt4.lib") +#pragma comment(lib, "advapi32.lib") + +VOID ReportClientError(DWORD dwError) { + LPWSTR debugMsg = NULL; + int len; + WCHAR hexError[32]; + HRESULT hr; + + if (IsDebuggerPresent()) { + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&debugMsg, 0, NULL); + if (len) { + OutputDebugString(debugMsg); + } + + hr = StringCchPrintf(hexError, sizeof(hexError)/sizeof(WCHAR), TEXT("%x"), dwError); + if (SUCCEEDED(hr)) { + OutputDebugString(hexError); + } + } + + if (NULL != debugMsg) LocalFree(debugMsg); +} + +VOID ReadPipe(HANDLE hPipe) { + LPSTR buffer[4096]; + DWORD cRead = 0; + DWORD dwError; + + for (;;) { + if (!PeekNamedPipe(hPipe, NULL, 0, NULL, &cRead, NULL)) { + dwError = GetLastError(); + ReportClientError(dwError); + break; + } + if (0 == cRead) { + break; + } + if (cRead > sizeof(buffer)) { + cRead = sizeof(buffer); + } + if (!ReadFile(hPipe, buffer, cRead, &cRead, NULL)) { + dwError = GetLastError(); + ReportClientError(dwError); + break; + } + if (0 == cRead) { + break; + } + LogDebugMessage(L"%.*S", cRead, buffer); + } +} + + +DWORD RpcCall_TaskCreateAsUser(LPCWSTR lpszCwd, LPCWSTR lpszJobName, + LPCWSTR lpszUser, LPCWSTR lpszPidFile, LPCWSTR lpszCmdLine, + HANDLE* phProcess, HANDLE* phThread, HANDLE* phStdIn, HANDLE* phStdOut, HANDLE* phStdErr) +{ + DWORD dwError = EXIT_FAILURE; + RPC_STATUS status; + LPWSTR lpszStringBinding = NULL; + ULONG ulCode; + DWORD dwSelfPid = GetCurrentProcessId(); + CREATE_PROCESS_REQUEST request; + CREATE_PROCESS_RESPONSE *response = NULL; + + request.cwd = lpszCwd; + request.jobName = lpszJobName; + request.user = lpszUser; + request.pidFile = lpszPidFile; + request.cmdLine = lpszCmdLine; + + status = RpcStringBindingCompose(NULL, + SVCBINDING, + NULL, + SVCNAME, + NULL, + &lpszStringBinding); + if (RPC_S_OK != status) { + ReportClientError(status); + dwError = status; + goto done; + } + + status = RpcBindingFromStringBinding(lpszStringBinding, &hHdpWinutilsSvcBinding); + + if (RPC_S_OK != status) { + ReportClientError(status); + dwError = status; + goto done; + } + + RpcTryExcept { + dwError = WinutilsCreateProcessAsUser(dwSelfPid, &request, &response); + } + RpcExcept(1) { + ulCode = RpcExceptionCode(); + ReportClientError(ulCode); + dwError = (DWORD) ulCode; + } + RpcEndExcept; + + if (ERROR_SUCCESS == dwError) { + *phProcess = response->hProcess; + *phThread = response->hThread; + *phStdIn = response->hStdIn; + *phStdOut = response->hStdOut; + *phStdErr = response->hStdErr; + } + + // From here on forward we do no change dwError even on RPC cleanup errors + status = RpcBindingFree(&hHdpWinutilsSvcBinding); + if (RPC_S_OK != status) { + ReportClientError(status); + goto done; + } + +done: + if (NULL != response) { + MIDL_user_free(response); + } + + if (NULL != lpszStringBinding) { + status = RpcStringFree(&lpszStringBinding); + if (RPC_S_OK != status) { + ReportClientError(status); + } + } + + return dwError; +} + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/config.cpp b/hadoop-common-project/hadoop-common/src/main/winutils/config.cpp new file mode 100644 index 0000000..8524f7c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/winutils/config.cpp @@ -0,0 +1,133 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "winutils.h" +#include +#import "msxml6.dll" + + +#define YARN_SITE_XML_PATH L"%HADOOP_CONF_DIR%\\yarn-site.xml" +#define YARN_DEFAULT_XML_PATH L"%HADOOP_CONF_DIR%\\yarn-default.xml" + + +#define ERROR_CHECK_HRESULT_DONE(hr, message) \ + if (FAILED(hr)) { \ + dwError = (DWORD) hr; \ + LogDebugMessage(L"%s: %x", message, hr); \ + goto done; \ + } + +DWORD GetConfigValue(__in LPCWSTR keyName, + __out size_t* len, __out_bcount(len) LPCWSTR* value) { + + DWORD dwError = ERROR_SUCCESS; + WCHAR xmlPath[MAX_PATH]; + + *len = 0; + *value = NULL; + + if (0 == ExpandEnvironmentStrings(YARN_SITE_XML_PATH, xmlPath, MAX_PATH)) { + dwError = GetLastError(); + goto done; + } + + dwError = GetConfigValueFromXmlFile(xmlPath, keyName, len, value); + if (*len) { + goto done; + } + + if (0 == ExpandEnvironmentStrings(YARN_DEFAULT_XML_PATH, xmlPath, MAX_PATH)) { + dwError = GetLastError(); + goto done; + } + + dwError = GetConfigValueFromXmlFile(xmlPath, keyName, len, value); + +done: + if (*len) { + LogDebugMessage(L"GetConfigValue:%d key:%s len:%d value:%.*s from:%s\n", dwError, keyName, *len, *len, *value, xmlPath); + } + return dwError; +} + + +DWORD GetConfigValueFromXmlFile(__in LPCWSTR xmlFile, __in LPCWSTR keyName, + __out size_t* outLen, __out_bcount(len) LPCWSTR* outValue) { + + DWORD dwError = ERROR_SUCCESS; + HRESULT hr; + WCHAR keyXsl[8192]; + size_t len = 0; + LPWSTR value = NULL; + + *outLen = 0; + *outValue = NULL; + + hr = StringCbPrintf(keyXsl, sizeof(keyXsl), L"//configuration/property[name='%s']/value/text()", keyName); + ERROR_CHECK_HRESULT_DONE(hr, L"StringCbPrintf"); + + hr = CoInitialize(NULL); + ERROR_CHECK_HRESULT_DONE(hr, L"CoInitialize"); + + try { + MSXML2::IXMLDOMDocument2Ptr pDoc; + hr = pDoc.CreateInstance(__uuidof(MSXML2::DOMDocument60), NULL, CLSCTX_INPROC_SERVER); + ERROR_CHECK_HRESULT_DONE(hr, L"CreateInstance"); + + pDoc->async = VARIANT_FALSE; + pDoc->validateOnParse = VARIANT_FALSE; + pDoc->resolveExternals = VARIANT_FALSE; + + _variant_t file(xmlFile); + + if (VARIANT_FALSE == pDoc->load(file)) { + dwError = pDoc->parseError->errorCode; + LogDebugMessage(L"load %s failed:%d %s\n", xmlFile, dwError, + static_cast(pDoc->parseError->Getreason())); + goto done; + } + + MSXML2::IXMLDOMElementPtr pRoot = pDoc->documentElement; + MSXML2::IXMLDOMNodePtr keyNode = pRoot->selectSingleNode(keyXsl); + + if (keyNode) { + _bstr_t bstrValue = static_cast<_bstr_t>(keyNode->nodeValue); + len = bstrValue.length(); + value = (LPWSTR) LocalAlloc(LPTR, (len+1) * sizeof(WCHAR)); + LPCWSTR lpwszValue = static_cast(bstrValue); + memcpy(value, lpwszValue, (len) * sizeof(WCHAR)); + LogDebugMessage(L"key:%s :%.*s [%s]\n", keyName, len, value, lpwszValue); + *outLen = len; + *outValue = value; + } + else { + LogDebugMessage(L"node Xpath:%s not found in:%s\n", keyXsl, xmlFile); + } + } + catch(_com_error errorObject) { + dwError = errorObject.Error(); + LogDebugMessage(L"catch _com_error:%x %s\n", dwError, errorObject.ErrorMessage()); + goto done; + } + +done: + CoUninitialize(); + + return dwError; +} + + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl b/hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl new file mode 100644 index 0000000..2d8c2b3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl @@ -0,0 +1,35 @@ +import "oaidl.idl"; +import "ocidl.idl"; + +[ + uuid(0492311C-1718-4F53-A6EB-86AD7039988D), + version(1.0), + pointer_default(unique), + implicit_handle(handle_t hHdpWinutilsSvcBinding), + endpoint("ncalrpc:[hdpwinutilsvc]"), +] +interface Hdpwinutilsvc +{ + typedef struct { + [string] const wchar_t* cwd; + [string] const wchar_t* jobName; + [string] const wchar_t* user; + [string] const wchar_t* pidFile; + [string] const wchar_t* cmdLine; + } CREATE_PROCESS_REQUEST; + + typedef struct { + LONG_PTR hProcess; + LONG_PTR hThread; + LONG_PTR hStdIn; + LONG_PTR hStdOut; + LONG_PTR hStdErr; + } CREATE_PROCESS_RESPONSE; + + + error_status_t WinutilsCreateProcessAsUser( + [in] int nmPid, + [in] CREATE_PROCESS_REQUEST *request, + [out] CREATE_PROCESS_RESPONSE **response); + +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h b/hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h index bae754c..e88a519 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h +++ b/hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h @@ -30,6 +30,10 @@ #include #include +#ifdef __cplusplus +extern "C" { +#endif + enum EXIT_CODE { /* Common success exit code shared among all utilities */ @@ -178,3 +182,43 @@ DWORD LoadUserProfileForLogon(__in HANDLE logonHandle, __out PROFILEINFO * pi); DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi); +DWORD RunService(__in int argc, __in_ecount(argc) wchar_t *argv[]); +void ServiceUsage(); + +VOID LogDebugMessage(LPCWSTR format, ...); + +DWORD SplitStringIgnoreSpaceW(__in size_t len, __in_bcount(len) LPCWSTR source, + __in WCHAR deli, + __out size_t* count, __out_ecount(count) WCHAR*** out); + +DWORD GetConfigValue( + __in LPCWSTR keyName, + __out size_t* len, + __out_bcount(len) LPCWSTR* value); +DWORD GetConfigValueFromXmlFile( + __in LPCWSTR xmlFile, + __in LPCWSTR keyName, + __out size_t* len, + __out_bcount(len) LPCWSTR* value); + +DWORD BuildServiceSecurityDescriptor( + __in ACCESS_MASK accessMask, + __in size_t grantSidCount, + __in_ecount(grantSidCount) PSID* pGrantSids, + __in size_t denySidCount, + __in_ecount(denySidCount) PSID* pDenySids, + __out PSECURITY_DESCRIPTOR* pSD); + + +#define SVCNAME TEXT("hdpwinutilsvc") +#define SVCBINDING TEXT("ncalrpc") + +int RpcCall_TaskCreateAsUser(LPCWSTR lpszCwd, LPCWSTR lpszJobName, + LPCWSTR lpszUser, LPCWSTR lpszPidFile, LPCWSTR lpszCmdLine); + + +#ifdef __cplusplus +} +#endif + + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c b/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c index 3de458c..2731717 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c +++ b/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c @@ -20,8 +20,11 @@ #pragma comment(lib, "Secur32.lib") #pragma comment(lib, "Userenv.lib") #include "winutils.h" +#include +#include #include #include +#include /* * The array of 12 months' three-letter abbreviations @@ -1706,10 +1709,12 @@ void ReportErrorCode(LPCWSTR func, DWORD err) (LPWSTR)&msg, 0, NULL); if (len > 0) { + LogDebugMessage(L"%s error (%d): %s\n", func, err, msg); fwprintf(stderr, L"%s error (%d): %s\n", func, err, msg); } else { + LogDebugMessage(L"%s error code: %d.\n", func, err); fwprintf(stderr, L"%s error code: %d.\n", func, err); } if (msg != NULL) LocalFree(msg); @@ -2026,6 +2031,8 @@ done: return loadProfileStatus; } + + DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi) { DWORD touchProfileStatus = ERROR_ASSERTION_FAILURE; // Failure to set status should trigger error @@ -2046,3 +2053,344 @@ DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi) done: return touchProfileStatus; } + + +//---------------------------------------------------------------------------- +// Function: LogDebugMessage +// +// Description: +// Sends a message to the debugger console, if one is attached +// +// Notes: +// Native debugger: windbg, ntsd, cdb, visual studio +// +VOID LogDebugMessage(LPCWSTR format, ...) { + LPWSTR buffer[8192]; + va_list args; + HRESULT hr; + + if (!IsDebuggerPresent()) return; + + va_start(args, format); + hr = StringCbVPrintf(buffer, sizeof(buffer), format, args); + if (SUCCEEDED(hr)) { + OutputDebugString(buffer); + } + va_end(args); +} + +//---------------------------------------------------------------------------- +// Function: SplitStringIgnoreSpaceW +// +// Description: +// splits a null-terminated string based on a delimiter +// +// Returns: +// ERROR_SUCCESS: on success +// error code: otherwise +// +// Notes: +// The tokes are also null-terminated +// Caller should use LocalFree to clear outTokens +// +DWORD SplitStringIgnoreSpaceW( + __in size_t len, + __in_bcount(len) LPCWSTR source, + __in WCHAR deli, + __out size_t* count, + __out_ecount(count) WCHAR*** outTokens) { + + size_t tokenCount = 0; + size_t crtSource; + size_t crtToken = 0; + WCHAR* lpwszTokenStart = NULL; + WCHAR* lpwszTokenEnd = NULL; + WCHAR* lpwszBuffer = NULL; + size_t tokenLength = 0; + size_t cchBufferLength = 0; + WCHAR crt; + WCHAR** tokens = NULL; + enum {BLANK, TOKEN, DELIMITER} State = BLANK; + + for(crtSource = 0; crtSource < len; ++crtSource) { + crt = source[crtSource]; + switch(State) { + case BLANK: // intentional fallthrough + case DELIMITER: + if (crt == deli) { + State = DELIMITER; + } + else if (!iswspace(crt)) { + ++tokenCount; + lpwszTokenEnd = lpwszTokenStart = source + crtSource; + State = TOKEN; + } + else { + State = BLANK; + } + break; + case TOKEN: + if (crt == deli) { + State = DELIMITER; + cchBufferLength += lpwszTokenEnd - lpwszTokenStart + 2; + } + else if (!iswspace(crt)) { + lpwszTokenEnd = source + crtSource; + } + break; + } + } + + if (State == TOKEN) { + cchBufferLength += lpwszTokenEnd - lpwszTokenStart + 2; + } + + LogDebugMessage(L"counted %d [buffer:%d] tokens in %s\n", tokenCount, cchBufferLength, source); + + #define COPY_CURRENT_TOKEN \ + tokenLength = lpwszTokenEnd - lpwszTokenStart + 1; \ + tokens[crtToken] = lpwszBuffer; \ + memcpy(tokens[crtToken], lpwszTokenStart, tokenLength*sizeof(WCHAR)); \ + tokens[crtToken][tokenLength] = L'\0'; \ + lpwszBuffer += (tokenLength+1); \ + ++crtToken; + + if (tokenCount) { + + // We use one contigous memory for both the pointer arrays and the data copy buffers + // We cannot use in-place references (zero-copy) because the funciton users + // need null-terminated strings for the tokens + + tokens = (WCHAR**) LocalAlloc(LPTR, + sizeof(WCHAR*) * tokenCount + // for the pointers + sizeof(WCHAR) * cchBufferLength); // for the data + + // Data will be copied after the array + lpwszBuffer = (WCHAR*)(((BYTE*)tokens) + (sizeof(WCHAR*) * tokenCount)); + + State = BLANK; + + for(crtSource = 0; crtSource < len; ++crtSource) { + crt = source[crtSource]; + switch(State) { + case DELIMITER: // intentional fallthrough + case BLANK: + if (crt == deli) { + State = DELIMITER; + } + else if (!iswspace(crt)) { + lpwszTokenEnd = lpwszTokenStart = source + crtSource; + State = TOKEN; + } + else { + State = BLANK; + } + break; + case TOKEN: + if (crt == deli) { + COPY_CURRENT_TOKEN; + State = DELIMITER; + } + else if (!iswspace(crt)) { + lpwszTokenEnd = source + crtSource; + } + break; + } + } + + // Copy out last token, if any + if (TOKEN == State) { + COPY_CURRENT_TOKEN; + } + } + + *count = tokenCount; + *outTokens = tokens; + + return ERROR_SUCCESS; +} + +//---------------------------------------------------------------------------- +// Function: BuildServiceSecurityDescriptor +// +// Description: +// Builds a security descriptor for an arbitrary object +// +// Returns: +// ERROR_SUCCESS: on success +// error code: otherwise +// +// Notes: +// The SD is a of the self-contained flavor (offsets, not pointers) +// Caller should use LocalFree to clear allocated pSD +// +DWORD BuildServiceSecurityDescriptor( + __in ACCESS_MASK accessMask, + __in size_t grantSidCount, + __in_ecount(grantSidCount) PSID* pGrantSids, + __in size_t denySidCount, + __in_ecount(denySidCount) PSID* pDenySids, + __out PSECURITY_DESCRIPTOR* pSD) { + + DWORD dwError = ERROR_SUCCESS; + int crt = 0; + int len = 0; + EXPLICIT_ACCESS* eas = NULL; + LPWSTR lpszSD = NULL; + ULONG cchSD = 0; + HANDLE hToken = INVALID_HANDLE_VALUE; + DWORD dwBufferSize = 0; + PTOKEN_USER pTokenUser = NULL; + PTOKEN_PRIMARY_GROUP pTokenGroup = NULL; + PSECURITY_DESCRIPTOR pTempSD = NULL; + ULONG cbSD = 0; + TRUSTEE owner, group; + + ZeroMemory(&owner, sizeof(owner)); + + // We'll need our own SID to add as SD owner + if (!OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY, &hToken)) { + dwError = GetLastError(); + goto done; + } + + if (!GetTokenInformation(hToken, TokenUser, NULL, 0, &dwBufferSize)) { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER != dwError) { + goto done; + } + } + + pTokenUser = (PTOKEN_USER) LocalAlloc(LPTR, dwBufferSize); + if (NULL == pTokenUser) { + dwError = GetLastError(); + goto done; + } + + if (!GetTokenInformation(hToken, TokenUser, pTokenUser, dwBufferSize, &dwBufferSize)) { + dwError = GetLastError(); + goto done; + } + + if (!IsValidSid(pTokenUser->User.Sid)) { + dwError = ERROR_INVALID_PARAMETER; + goto done; + } + + dwBufferSize = 0; + if (!GetTokenInformation(hToken, TokenPrimaryGroup, NULL, 0, &dwBufferSize)) { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER != dwError) { + goto done; + } + } + + pTokenGroup = (PTOKEN_USER) LocalAlloc(LPTR, dwBufferSize); + if (NULL == pTokenUser) { + dwError = GetLastError(); + goto done; + } + + if (!GetTokenInformation(hToken, TokenPrimaryGroup, pTokenGroup, dwBufferSize, &dwBufferSize)) { + dwError = GetLastError(); + goto done; + } + + if (!IsValidSid(pTokenGroup->PrimaryGroup)) { + dwError = ERROR_INVALID_PARAMETER; + goto done; + } + + owner.TrusteeForm = TRUSTEE_IS_SID; + owner.TrusteeType = TRUSTEE_IS_UNKNOWN; + owner.ptstrName = (LPCWSTR) pTokenUser->User.Sid; + + group.TrusteeForm = TRUSTEE_IS_SID; + group.TrusteeType = TRUSTEE_IS_UNKNOWN; + group.ptstrName = (LPCWSTR) pTokenGroup->PrimaryGroup; + + eas = (EXPLICIT_ACCESS*) alloca(sizeof(EXPLICIT_ACCESS) * (grantSidCount + denySidCount)); + + // Build the granted list + for (crt = 0; crt < grantSidCount; ++crt) { + eas[crt].grfAccessPermissions = accessMask; + eas[crt].grfAccessMode = GRANT_ACCESS; + eas[crt].grfInheritance = NO_INHERITANCE; + eas[crt].Trustee.TrusteeForm = TRUSTEE_IS_SID; + eas[crt].Trustee.TrusteeType = TRUSTEE_IS_UNKNOWN; + eas[crt].Trustee.ptstrName = (LPCWSTR) pGrantSids[crt]; + } + + // Build the deny list + for (; crt < grantSidCount + denySidCount; ++crt) { + eas[crt].grfAccessPermissions = accessMask; + eas[crt].grfAccessMode = DENY_ACCESS; + eas[crt].grfInheritance = NO_INHERITANCE; + eas[crt].Trustee.TrusteeForm = TRUSTEE_IS_SID; + eas[crt].Trustee.TrusteeType = TRUSTEE_IS_UNKNOWN; + eas[crt].Trustee.ptstrName = (LPCWSTR) pDenySids[crt - grantSidCount]; + } + + dwError = BuildSecurityDescriptor( + &owner, + &group, + crt, + eas, + 0, // cCountOfAuditEntries + NULL, // pListOfAuditEntries + NULL, // pOldSD + &cbSD, + &pTempSD); + if (ERROR_SUCCESS != dwError) { + goto done; + } + + *pSD = pTempSD; + pTempSD = NULL; + + if (IsDebuggerPresent()) { + ConvertSecurityDescriptorToStringSecurityDescriptor(*pSD, + SDDL_REVISION_1, + DACL_SECURITY_INFORMATION, + &lpszSD, + &cchSD); + LogDebugMessage(L"pSD: %.*s\n", cchSD, lpszSD); + } + +done: + if (pTokenUser) LocalFree(pTokenUser); + if (INVALID_HANDLE_VALUE != hToken) CloseHandle(hToken); + if (lpszSD) LocalFree(lpszSD); + if (pTempSD) LocalFree(pTempSD); + return dwError; +} + + +//---------------------------------------------------------------------------- +// Function: MIDL_user_allocate +// +// Description: +// Hard-coded function name used by RPC midl code for allocations +// +// Notes: +// Must match the de-allocation mechanism used in MIDL_user_free +// +void __RPC_FAR * __RPC_USER MIDL_user_allocate(size_t len) +{ + return LocalAlloc(LPTR, len); +} + + //---------------------------------------------------------------------------- + // Function: MIDL_user_free + // + // Description: + // Hard-coded function name used by RPC midl code for deallocations + // + // NoteS: + // Must match the allocation mechanism used in MIDL_user_allocate + // +void __RPC_USER MIDL_user_free(void __RPC_FAR * ptr) +{ + LocalFree(ptr); +} + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj b/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj index fc0519d..63d936b 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj +++ b/hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj @@ -160,11 +160,22 @@ + + + + + + + + true + X64 + + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/main.c b/hadoop-common-project/hadoop-common/src/main/winutils/main.c index 0f40774..e43b6ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/main.c +++ b/hadoop-common-project/hadoop-common/src/main/winutils/main.c @@ -67,6 +67,10 @@ int wmain(__in int argc, __in_ecount(argc) wchar_t* argv[]) { return SystemInfo(); } + else if (wcscmp(L"service", cmd) == 0) + { + return RunService(argc - 1, argv + 1); + } else if (wcscmp(L"help", cmd) == 0) { Usage(argv[0]); @@ -119,5 +123,9 @@ The available commands and their usages are:\n\n", program); fwprintf(stdout, L"%-15s%s\n\n", L"task", L"Task operations."); TaskUsage(); + + fwprintf(stdout, L"%-15s%s\n\n", L"service", L"Service operations."); + ServiceUsage(); + fwprintf(stdout, L"\n\n"); } diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/service.c b/hadoop-common-project/hadoop-common/src/main/winutils/service.c new file mode 100644 index 0000000..89255eb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/winutils/service.c @@ -0,0 +1,882 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with this +* work for additional information regarding copyright ownership. The ASF +* licenses this file to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +#include "winutils.h" +#include "winutils_msg.h" +#include +#include +#include +#include +#include +#include +#include "hdpwinutilsvc_h.h" + +#pragma comment(lib, "Rpcrt4.lib") +#pragma comment(lib, "advapi32.lib") +#pragma comment(lib, "authz.lib") + + +#define NM_WSCE_ALLOWED L"yarn.nodemanager.windows-secure-container-executor.allowed" +#define NM_WSCE_DENIED L"yarn.nodemanager.windows-secure-container-executor.denied" + +#define SERVICE_ACCESS_MASK 0x00000001 + +SERVICE_STATUS gSvcStatus; +SERVICE_STATUS_HANDLE gSvcStatusHandle; +HANDLE ghSvcStopEvent = INVALID_HANDLE_VALUE; +HANDLE ghWaitObject = INVALID_HANDLE_VALUE; +HANDLE ghEventLog = INVALID_HANDLE_VALUE; +BOOL gVerbose = TRUE; +BOOL isListenning = FALSE; +int allowedCount = 0; +PSID* allowedSids = NULL; +PSECURITY_DESCRIPTOR pAllowedSD = NULL; + +VOID SvcError(DWORD dwError); +VOID WINAPI SvcMain(DWORD dwArg, LPTSTR* lpszArgv); +VOID SvcInit( DWORD dwArgc, LPTSTR *lpszArgv); +VOID RpcInit(DWORD dwArgc, LPTSTR *lpszArgv); +DWORD AuthInit(); +VOID ReportSvcStatus( DWORD dwCurrentState, + DWORD dwWin32ExitCode, + DWORD dwWaitHint); +VOID WINAPI SvcCtrlHandler( DWORD dwCtrl ); +VOID CALLBACK SvcShutdown( + _In_ PVOID lpParameter, + _In_ BOOLEAN TimerOrWaitFired); + +#define CHECK_ERROR_DONE(status, expected, category, message) \ + if (status != expected) { \ + ReportSvcCheckError( \ + EVENTLOG_ERROR_TYPE, \ + category, \ + status, \ + message); \ + goto done; \ + } else { \ + LogDebugMessage(L"%s: OK\n", message); \ + } + + +#define CHECK_RPC_STATUS_DONE(status, message) \ + CHECK_ERROR_DONE(status, RPC_S_OK, RPC_CATEGORY, message) + +#define CHECK_SVC_STATUS_DONE(status, message) \ + CHECK_ERROR_DONE(status, ERROR_SUCCESS, SERVICE_CATEGORY, message) + +#define CHECK_UNWIND_RPC(rpcCall) { \ + unwindStatus = rpcCall; \ + if (RPC_S_OK != unwindStatus) { \ + ReportSvcCheckError( \ + EVENTLOG_WARNING_TYPE, \ + RPC_CATEGORY, \ + unwindStatus, \ + L#rpcCall); \ + } \ + } + + +void ReportSvcCheckError(WORD type, WORD category, DWORD dwError, LPCWSTR message) { + int len; + LPWSTR systemMsg = NULL; + LPWSTR appMsg = NULL; + DWORD dwReportError; + LPWSTR reportMsg = NULL; + WCHAR hexError[32]; + LPCWSTR inserts[] = {message, NULL, NULL, NULL}; + HRESULT hr; + + hr = StringCbPrintf(hexError, sizeof(hexError), TEXT("%x"), dwError); + if (SUCCEEDED(hr)) { + inserts[1] = hexError; + } + else { + inserts[1] = L"(Failed to format dwError as string)"; + } + + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&systemMsg, 0, NULL); + + if (len) { + inserts[2] = systemMsg; + } + else { + inserts[2] = L"(Failed to get the system error message)"; + } + + LogDebugMessage(L"%s:%d %.*s\n", message, dwError, len, systemMsg); + + if (INVALID_HANDLE_VALUE != ghEventLog) { + if (!ReportEvent(ghEventLog, type, category, MSG_CHECK_ERROR, + NULL, // lpUserSid + (WORD) 3, // wNumStrings + (DWORD) 0, // dwDataSize + inserts, // *lpStrings + NULL // lpRawData + )) { + // We tried to report and failed. Send to dbg. + dwReportError = GetLastError(); + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwReportError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&reportMsg, 0, NULL); + LogDebugMessage(L"ReportEvent: Error:%d %.*s\n", dwReportError, reportMsg); + } + }; + + if (NULL != systemMsg) LocalFree(systemMsg); + if (NULL != reportMsg) LocalFree(reportMsg); +} + + +VOID ReportSvcError(WORD type, WORD category, DWORD msgId, DWORD dwError) { + int len; + LPWSTR systemMsg = NULL; + LPWSTR appMsg = NULL; + DWORD dwReportError; + LPWSTR reportMsg = NULL; + LPWSTR debugMsg = NULL; + WCHAR hexError[32]; + LPWSTR inserts[] = {NULL, NULL, NULL}; + HRESULT hr; + + hr = StringCbPrintf(hexError, sizeof(hexError), TEXT("%x"), dwError); + if (SUCCEEDED(hr)) { + inserts[0] = hexError; + } + else { + inserts[0] = TEXT("(Failed to format dwError as string)"); + OutputDebugString(inserts[0]); + } + + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&systemMsg, 0, NULL); + + if (len) { + inserts[1] = systemMsg; + } + else { + inserts[1] = TEXT("(Failed to get the system error message)"); + OutputDebugString(inserts[1]); + } + + if (INVALID_HANDLE_VALUE != ghEventLog) { + if (!ReportEvent(ghEventLog, type, category, msgId, + NULL, // lpUserSid + (WORD) 2, // wNumStrings + (DWORD) 0, // dwDataSize + inserts, // *lpStrings + NULL // lpRawData + ) && IsDebuggerPresent()) { + // We tried to report and failed but debugger is attached. Send to dbg. + dwReportError = GetLastError(); + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, + NULL, dwReportError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&reportMsg, 0, NULL); + if (len) { + OutputDebugString(reportMsg); + } + else { + OutputDebugString(TEXT("Failed to get ReportEvent error message from system")); + } + OutputDebugString(inserts[0]); + OutputDebugString(inserts[1]); + } + } + else if (IsDebuggerPresent()) { + // The eventlog handle is invalid but debugger is attached. Send to dbg. + len = FormatMessageW( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_HMODULE | FORMAT_MESSAGE_ARGUMENT_ARRAY, + NULL, msgId, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPWSTR)&debugMsg, 0, inserts); + if (len) { + OutputDebugString(debugMsg); + } + else { + OutputDebugString(TEXT("Failed to format message from process resource table")); + OutputDebugString(inserts[0]); + OutputDebugString(inserts[1]); + } + } + + + if (NULL != debugMsg) LocalFree(debugMsg); + if (NULL != systemMsg) LocalFree(systemMsg); + if (NULL != reportMsg) LocalFree(reportMsg); +} + + +//---------------------------------------------------------------------------- +// Function: RunService +// +// Description: +// Registers with NT SCM and starts the service +// +// Returns: +// ERROR_SUCCESS: On success +// Error code otherwise: otherwise +DWORD RunService(__in int argc, __in_ecount(argc) wchar_t *argv[]) +{ + DWORD dwError= ERROR_SUCCESS; + int argStart = 1; + + static const SERVICE_TABLE_ENTRY serviceTable[] = { + { SVCNAME, (LPSERVICE_MAIN_FUNCTION) SvcMain }, + { NULL, NULL } + }; + + dwError = AuthInit(); + if (ERROR_SUCCESS != dwError) { + SvcError(dwError); + goto done; + } + + ghEventLog = RegisterEventSource(NULL, SVCNAME); + if (NULL == ghEventLog) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"RegisterEventSource") + } + + if (!StartServiceCtrlDispatcher(serviceTable)) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"StartServiceCtrlDispatcher") + } + +done: + return dwError; +} + +VOID WINAPI SvcMain(DWORD dwArgc, LPTSTR* lpszArgv) { + DWORD dwError = ERROR_SUCCESS; + + gSvcStatusHandle = RegisterServiceCtrlHandler( + SVCNAME, + SvcCtrlHandler); + if( !gSvcStatusHandle ) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"RegisterServiceCtrlHandler") + } + + // These SERVICE_STATUS members remain as set here + gSvcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; + gSvcStatus.dwServiceSpecificExitCode = 0; + + // Report initial status to the SCM + ReportSvcStatus( SERVICE_START_PENDING, NO_ERROR, 3000 ); + + // Perform service-specific initialization and work. + SvcInit( dwArgc, lpszArgv ); +done: + ; +} + +VOID SvcInit( DWORD dwArgc, LPTSTR *lpszArgv) { + DWORD dwError = ERROR_SUCCESS; + + + ghSvcStopEvent = CreateEvent( + NULL, // default security attributes + TRUE, // manual reset event + FALSE, // not signaled + NULL); // no name + + if ( ghSvcStopEvent == NULL) + { + dwError = GetLastError(); + ReportSvcError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + MSG_SVC_CREATE_STOP_EVENT_FAILED, dwError); + ReportSvcStatus( SERVICE_STOPPED, dwError, 0 ); + return; + } + + if (!RegisterWaitForSingleObject (&ghWaitObject, + ghSvcStopEvent, + SvcShutdown, + NULL, + INFINITE, + WT_EXECUTEONLYONCE)) { + dwError = GetLastError(); + ReportSvcError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY, + MSG_SVC_REGISTER_STOP_WAIT_FUNCTION_FAILED, dwError); + CloseHandle(ghSvcStopEvent); + ReportSvcStatus( SERVICE_STOPPED, dwError, 0 ); + return; + } + + // Report running status when initialization is complete. + ReportSvcStatus( SERVICE_RUNNING, NO_ERROR, 0 ); + + RpcInit(dwArgc, lpszArgv); +} + +RPC_STATUS CALLBACK RpcAuthorizeCallback ( + RPC_IF_HANDLE hInterface, + void* pContext) { + RPC_STATUS status, unwindStatus, authStatus = RPC_S_ACCESS_DENIED; + DWORD dwError; + LUID luidReserved2; + AUTHZ_ACCESS_REQUEST request; + AUTHZ_ACCESS_REPLY reply; + AUTHZ_CLIENT_CONTEXT_HANDLE hClientContext = NULL; + DWORD authError = ERROR_SUCCESS; + DWORD saclResult = 0; + ACCESS_MASK grantedMask = 0; + + ZeroMemory(&luidReserved2, sizeof(luidReserved2)); + ZeroMemory(&request, sizeof(request)); + ZeroMemory(&reply, sizeof(reply)); + + if (gVerbose) { + if (!ReportEvent(ghEventLog, EVENTLOG_INFORMATION_TYPE, + RPC_CATEGORY, MSG_RPC_AUTHENTICATE_CLIENT, + NULL, // lpUserSid + (WORD) 0, // wNumStrings + (DWORD) 0, // dwDataSize + NULL, // *lpStrings + NULL)) // lpRawData + { + dwError = GetLastError(); + ReportSvcError(EVENTLOG_WARNING_TYPE, RPC_CATEGORY, + MSG_RPC_CREATE_TASK_AS_USER_LOG_FAILED, dwError); + } + } + + status = RpcGetAuthorizationContextForClient(NULL, + FALSE, // ImpersonateOnReturn + NULL, // Reserved1 + NULL, // pExpirationTime + luidReserved2, // Reserved2 + 0, // Reserved3 + NULL, // Reserved4 + &hClientContext); + CHECK_RPC_STATUS_DONE(status, L"RpcGetAuthorizationContextForClient"); + + request.DesiredAccess = MAXIMUM_ALLOWED; + reply.Error = &authError; + reply.SaclEvaluationResults = &saclResult; + reply.ResultListLength = 1; + reply.GrantedAccessMask = &grantedMask; + + if (!AuthzAccessCheck( + 0, + hClientContext, + &request, + NULL, // AuditEvent + pAllowedSD, + NULL, // OptionalSecurityDescriptorArray + 0, // OptionalSecurityDescriptorCount + &reply, + NULL // phAccessCheckResults + )) { + dwError = GetLastError(); + CHECK_SVC_STATUS_DONE(dwError, L"AuthzAccessCheck"); + } + + LogDebugMessage(L"AutzAccessCheck: Error:%d sacl:%d access:%d\n", + authError, saclResult, grantedMask); + if (authError == ERROR_SUCCESS && (grantedMask & SERVICE_ACCESS_MASK)) { + authStatus = RPC_S_OK; + } + +done: + if (NULL != hClientContext) CHECK_UNWIND_RPC(RpcFreeAuthorizationContext(&hClientContext)); + return authStatus; +} + +DWORD AuthInit() { + DWORD dwError = ERROR_SUCCESS; + int count = 0; + int crt = 0; + int len = 0; + WCHAR siteXmlPath[MAX_PATH]; + WCHAR defaultXmlPath[MAX_PATH]; + LPCWSTR value = NULL; + WCHAR** tokens = NULL; + EXPLICIT_ACCESS* eas = NULL; + PACL pACL = NULL; + LPWSTR lpszSD = NULL; + ULONG cchSD = 0; + HANDLE hToken = INVALID_HANDLE_VALUE; + DWORD dwBufferSize = 0; + PTOKEN_USER pTokenUser = NULL; + + dwError = GetConfigValue(NM_WSCE_ALLOWED, &len, &value); + CHECK_SVC_STATUS_DONE(dwError, L"GetConfigValue"); + + if (0 == len) { + CHECK_SVC_STATUS_DONE(ERROR_BAD_CONFIGURATION, NM_WSCE_ALLOWED); + } + + dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens); + CHECK_SVC_STATUS_DONE(dwError, L"SplitStringIgnoreSpaceW"); + LogDebugMessage(L"split: %d tokens:\n", count); + + allowedSids = (PSID*) LocalAlloc(LPTR, sizeof(PSID) * count); + for (crt = 0; crt < count; ++crt) { + LogDebugMessage(L"\ttoken %d:%s\n", crt, tokens[crt]); + dwError = GetSidFromAcctNameW(tokens[crt], &allowedSids[crt]); + CHECK_SVC_STATUS_DONE(dwError, L"GetSidFromAcctNameW"); + } + + allowedCount = count; + + dwError = BuildServiceSecurityDescriptor(SERVICE_ACCESS_MASK, + count, allowedSids, 0, NULL, &pAllowedSD); + CHECK_SVC_STATUS_DONE(dwError, L"BuildServiceSecurityDescriptor"); + +done: + if (pTokenUser) LocalFree(pTokenUser); + if (INVALID_HANDLE_VALUE != hToken) CloseHandle(hToken); + if (lpszSD) LocalFree(lpszSD); + if (pACL) LocalFree(pACL); + if (value) LocalFree(value); + return dwError; +} + +VOID RpcInit(DWORD dwArgc, LPTSTR *lpszArgv) { + RPC_STATUS status; + + status = RpcServerUseProtseqIf(SVCBINDING, + RPC_C_LISTEN_MAX_CALLS_DEFAULT, + Hdpwinutilsvc_v1_0_s_ifspec, + NULL); + if (RPC_S_OK != status) { + ReportSvcError(EVENTLOG_ERROR_TYPE, RPC_CATEGORY, + MSG_RPC_USE_PROT_SEQ_FAILED, status); + SvcError(status); + return; + } + + status = RpcServerRegisterIfEx(Hdpwinutilsvc_v1_0_s_ifspec, + NULL, // MgrTypeUuid + NULL, // MgrEpv + RPC_IF_ALLOW_LOCAL_ONLY, // Flags + RPC_C_LISTEN_MAX_CALLS_DEFAULT, // Max calls + RpcAuthorizeCallback); // Auth callback + + if (RPC_S_OK != status) { + ReportSvcError(EVENTLOG_ERROR_TYPE, RPC_CATEGORY, + MSG_RPC_REGISTER_IF_FAILED, status); + SvcError(status); + return; + } + + status = RpcServerListen(1, RPC_C_LISTEN_MAX_CALLS_DEFAULT, TRUE); + if (RPC_S_ALREADY_LISTENING == status) { + ReportSvcError(EVENTLOG_WARNING_TYPE, RPC_CATEGORY, + MSG_RPC_ALREADY_LISTENNING, status); + } + else if (RPC_S_OK != status) { + ReportSvcError(EVENTLOG_ERROR_TYPE, RPC_CATEGORY, + MSG_RPC_REGISTER_IF_FAILED, status); + SvcError(status); + return; + } + + isListenning = TRUE; + + ReportSvcError(EVENTLOG_INFORMATION_TYPE, RPC_CATEGORY, + MSG_RPC_SERVICE_HAS_STARTED, ERROR_SUCCESS); +} + +VOID RpcStop() { + RPC_STATUS status; + + if (isListenning) { + + status = RpcMgmtStopServerListening(NULL); + isListenning = FALSE; + + if (RPC_S_OK != status) { + ReportSvcError(EVENTLOG_WARNING_TYPE, RPC_CATEGORY, + MSG_RPC_STOP_SERVER_FAILED, status); + } + + ReportSvcError(EVENTLOG_INFORMATION_TYPE, RPC_CATEGORY, + MSG_RPC_SERVICE_HAS_STOPPED, ERROR_SUCCESS); + } +} + +VOID CleanupHandles() { + if (INVALID_HANDLE_VALUE != ghWaitObject) { + UnregisterWait(ghWaitObject); + ghWaitObject = INVALID_HANDLE_VALUE; + } + if (INVALID_HANDLE_VALUE != ghSvcStopEvent) { + CloseHandle(ghSvcStopEvent); + ghSvcStopEvent = INVALID_HANDLE_VALUE; + } + if (INVALID_HANDLE_VALUE != ghEventLog) { + DeregisterEventSource(ghEventLog); + ghEventLog = INVALID_HANDLE_VALUE; + } +} + +VOID SvcError(DWORD dwError) { + RpcStop(); + CleanupHandles(); + ReportSvcStatus( SERVICE_STOPPED, dwError, 0 ); +} + +VOID CALLBACK SvcShutdown( + _In_ PVOID lpParameter, + _In_ BOOLEAN TimerOrWaitFired) { + RpcStop(); + CleanupHandles(); + ReportSvcStatus( SERVICE_STOPPED, NO_ERROR, 0 ); +} + +VOID WINAPI SvcCtrlHandler( DWORD dwCtrl ) +{ + // Handle the requested control code. + + switch(dwCtrl) + { + case SERVICE_CONTROL_STOP: + ReportSvcStatus(SERVICE_STOP_PENDING, NO_ERROR, 0); + + // Signal the service to stop. + SetEvent(ghSvcStopEvent); + + return; + + case SERVICE_CONTROL_INTERROGATE: + break; + + default: + break; + } + +} + +VOID ReportSvcStatus( DWORD dwCurrentState, + DWORD dwWin32ExitCode, + DWORD dwWaitHint) +{ + static DWORD dwCheckPoint = 1; + DWORD dwError; + + // Fill in the SERVICE_STATUS structure. + + gSvcStatus.dwCurrentState = dwCurrentState; + gSvcStatus.dwWin32ExitCode = dwWin32ExitCode; + gSvcStatus.dwWaitHint = dwWaitHint; + + if (dwCurrentState == SERVICE_START_PENDING) + gSvcStatus.dwControlsAccepted = 0; + else gSvcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP; + + if ( (dwCurrentState == SERVICE_RUNNING) || + (dwCurrentState == SERVICE_STOPPED) ) + gSvcStatus.dwCheckPoint = 0; + else gSvcStatus.dwCheckPoint = dwCheckPoint++; + + // Report the status of the service to the SCM. + if (!SetServiceStatus( gSvcStatusHandle, &gSvcStatus)) { + dwError = GetLastError(); + ReportSvcError(EVENTLOG_WARNING_TYPE, SERVICE_CATEGORY, + MSG_SVC_STATUS_FAILED, dwError); + }; +} + +//---------------------------------------------------------------------------- +// Function: WinutilsCreateProcessAsUser +// +// Description: +// The RPC midl declared function implementation +// +// Returns: +// ERROR_SUCCESS: On success +// Error code otherwise: otherwise +// +// Notes: +// This is the entry point when the NodeManager does the RPC call +// Note that the RPC call does not do any S4U work. Is simply spawns (suspended) wintutils +// using the right command line and the handles over the spwaned process to the NM +// The actual S4U work occurs in the spawned process, run and monitored by the NM +// +error_status_t WinutilsCreateProcessAsUser( + /* [in] */ int nmPid, + /* [in] */ CREATE_PROCESS_REQUEST *request, + /* [out] */ CREATE_PROCESS_RESPONSE **response) { + + DWORD dwError = ERROR_SUCCESS; + LPCWSTR inserts[] = {request->cwd, request->jobName, request->user, request->pidFile, request->cmdLine, NULL}; + WCHAR winutilsPath[MAX_PATH]; + WCHAR fullCmdLine[32768]; + HANDLE taskStdInRd = INVALID_HANDLE_VALUE, taskStdInWr = INVALID_HANDLE_VALUE, + taskStdOutRd = INVALID_HANDLE_VALUE, taskStdOutWr = INVALID_HANDLE_VALUE, + taskStdErrRd = INVALID_HANDLE_VALUE, taskStdErrWr = INVALID_HANDLE_VALUE, + hNmProcess = INVALID_HANDLE_VALUE, + hDuplicateProcess = INVALID_HANDLE_VALUE, + hDuplicateThread = INVALID_HANDLE_VALUE, + hDuplicateStdIn = INVALID_HANDLE_VALUE, + hDuplicateStdOut = INVALID_HANDLE_VALUE, + hDuplicateStdErr = INVALID_HANDLE_VALUE, + hSelfProcess = INVALID_HANDLE_VALUE; + BOOL fMustCleanupProcess = FALSE; + + HRESULT hr; + STARTUPINFO si; + PROCESS_INFORMATION pi; + SECURITY_ATTRIBUTES saTaskStdInOutErr; + + ZeroMemory( &si, sizeof(si) ); + si.cb = sizeof(si); + ZeroMemory( &pi, sizeof(pi) ); + pi.hProcess = INVALID_HANDLE_VALUE; + pi.hThread = INVALID_HANDLE_VALUE; + ZeroMemory( &saTaskStdInOutErr, sizeof(saTaskStdInOutErr)); + + if (gVerbose) { + if (!ReportEvent(ghEventLog, EVENTLOG_INFORMATION_TYPE, + RPC_CATEGORY, MSG_RPC_CREATE_TASK_AS_USER, + NULL, // lpUserSid + (WORD) 5, // wNumStrings + (DWORD) 0, // dwDataSize + inserts, // *lpStrings + NULL)) // lpRawData + { + dwError = GetLastError(); + ReportSvcError(EVENTLOG_WARNING_TYPE, RPC_CATEGORY, + MSG_RPC_CREATE_TASK_AS_USER_LOG_FAILED, dwError); + } + } + + dwError = EnablePrivilege(SE_DEBUG_NAME); + if( dwError != ERROR_SUCCESS ) { + goto done; + } + + // NB: GetCurrentProcess returns a pseudo-handle that just so happens + // has the value -1, ie. INVALID_HANDLE_VALUE. It cannot fail. + // + hSelfProcess = GetCurrentProcess(); + + hNmProcess = OpenProcess(PROCESS_DUP_HANDLE, FALSE, nmPid); + if (NULL == hNmProcess) { + dwError = GetLastError(); + goto done; + } + + GetModuleFileName(NULL, winutilsPath, sizeof(winutilsPath)/sizeof(WCHAR)); + dwError = GetLastError(); // Always check after GetModuleFileName for ERROR_INSSUFICIENT_BUFFER + if (dwError) { + ReportSvcError(EVENTLOG_ERROR_TYPE, TASK_CATEGORY, + MSG_TASK_GET_MODULE_FILENAME_FAILED, dwError); + goto done; + } + + // NB. We can call CreateProcess("wintuls","task create ...") or we can call + // CreateProcess(NULL, "winutils task create"). Only the second form passes "task" as + // argv[1], as expected by main. First form passes "task" as argv[0] and main fails. + + hr = StringCbPrintf(fullCmdLine, sizeof(fullCmdLine), L"\"%s\" task createAsUser %ls %ls %ls %ls", + winutilsPath, + request->jobName, request->user, request->pidFile, request->cmdLine); + if (FAILED(hr)) { + ReportSvcError(EVENTLOG_ERROR_TYPE, TASK_CATEGORY, + MSG_TASK_BUILD_FULL_CMDLINE_FAILED, hr); + goto done; + } + + LogDebugMessage(L"[%ls]: %ls %ls\n", request->cwd, winutilsPath, fullCmdLine); + + // stdin/stdout/stderr redirection is handled here + // We create 3 anonimous named pipes. + // Security attributes are required so that the handles can be inherited. + // We assign one end of the pipe to the process (stdin gets a read end, stdout gets a write end) + // We then duplicate the other end in the NM process, and we close our own handle + // Finally we return the duplicate handle values to the NM + // The NM will attach Java file dscriptors to the duplicated handles and + // read/write them as ordinary Java InputStream/OutputStream objects + + si.dwFlags |= STARTF_USESTDHANDLES; + + saTaskStdInOutErr.nLength = sizeof(SECURITY_ATTRIBUTES); + saTaskStdInOutErr.bInheritHandle = TRUE; + saTaskStdInOutErr.lpSecurityDescriptor = NULL; + + if (!CreatePipe(&taskStdInRd, &taskStdInWr, &saTaskStdInOutErr, 0)) { + dwError = GetLastError(); + goto done; + } + if (!SetHandleInformation(taskStdInWr, HANDLE_FLAG_INHERIT, FALSE)) { + dwError = GetLastError(); + goto done; + } + si.hStdInput = taskStdInRd; + + if (!CreatePipe(&taskStdOutRd, &taskStdOutWr, &saTaskStdInOutErr, 0)) { + dwError = GetLastError(); + goto done; + } + if (!SetHandleInformation(taskStdOutRd, HANDLE_FLAG_INHERIT, FALSE)) { + dwError = GetLastError(); + goto done; + } + si.hStdOutput = taskStdOutWr; + + if (!CreatePipe(&taskStdErrRd, &taskStdErrWr, &saTaskStdInOutErr, 0)) { + dwError = GetLastError(); + goto done; + } + if (!SetHandleInformation(taskStdErrRd, HANDLE_FLAG_INHERIT, FALSE)) { + dwError = GetLastError(); + goto done; + } + si.hStdError = taskStdErrWr; + + if (!CreateProcess( + NULL, // lpApplicationName, + fullCmdLine, // lpCommandLine, + NULL, // lpProcessAttributes, + NULL, // lpThreadAttributes, + TRUE, // bInheritHandles, + CREATE_SUSPENDED, // dwCreationFlags, + NULL, // lpEnvironment, + request->cwd, // lpCurrentDirectory, + &si, // lpStartupInfo + &pi)) { // lpProcessInformation + + dwError = GetLastError(); + ReportSvcError(EVENTLOG_ERROR_TYPE, TASK_CATEGORY, + MSG_TASK_CREATE_PROCESS_FAILED, dwError); + goto done; + } + + fMustCleanupProcess = TRUE; + + LogDebugMessage(L"CreateProcess: pid:%x\n", pi.dwProcessId); + + if (!DuplicateHandle(hSelfProcess, pi.hProcess, hNmProcess, + &hDuplicateProcess, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: pi.hProcess\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, pi.hThread, hNmProcess, + &hDuplicateThread, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: pi.hThread\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, taskStdInWr, hNmProcess, + &hDuplicateStdIn, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: taskStdInWr\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, taskStdOutRd, hNmProcess, + &hDuplicateStdOut, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: taskStdOutRd\n"); + goto done; + } + + if (!DuplicateHandle(hSelfProcess, taskStdErrRd, hNmProcess, + &hDuplicateStdErr, 0, FALSE, DUPLICATE_SAME_ACCESS)) { + dwError = GetLastError(); + LogDebugMessage(L"failed: taskStdErrRd\n"); + goto done; + } + + *response = (CREATE_PROCESS_RESPONSE*) MIDL_user_allocate(sizeof(CREATE_PROCESS_RESPONSE)); + if (NULL == *response) { + dwError = ERROR_OUTOFMEMORY; + LogDebugMessage(L"Failed to allocate CREATE_PROCESS_RESPONSE* response\n"); + goto done; + } + + // We're now transfering ownership of the duplicated handles to the caller + // If the RPC call fails *after* this point the handles are leaked inside the NM process + + (*response)->hProcess = hDuplicateProcess; + (*response)->hThread = hDuplicateThread; + (*response)->hStdIn = hDuplicateStdIn; + (*response)->hStdOut = hDuplicateStdOut; + (*response)->hStdErr = hDuplicateStdErr; + + fMustCleanupProcess = FALSE; + +done: + + if (fMustCleanupProcess) { + LogDebugMessage(L"Cleaning process: %d due to error:%d\n", pi.dwProcessId, dwError); + TerminateProcess(pi.hProcess, EXIT_FAILURE); + + // cleanup the duplicate handles inside the NM. + + if (INVALID_HANDLE_VALUE != hDuplicateProcess) { + DuplicateHandle(hNmProcess, hDuplicateProcess, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateThread) { + DuplicateHandle(hNmProcess, hDuplicateThread, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateStdIn) { + DuplicateHandle(hNmProcess, hDuplicateStdIn, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateStdOut) { + DuplicateHandle(hNmProcess, hDuplicateStdOut, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + if (INVALID_HANDLE_VALUE != hDuplicateStdErr) { + DuplicateHandle(hNmProcess, hDuplicateStdErr, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE); + } + } + + if (INVALID_HANDLE_VALUE != hSelfProcess) CloseHandle(hSelfProcess); + if (INVALID_HANDLE_VALUE != hNmProcess) CloseHandle(hNmProcess); + if (INVALID_HANDLE_VALUE != taskStdInRd) CloseHandle(taskStdInRd); + if (INVALID_HANDLE_VALUE != taskStdInWr) CloseHandle(taskStdInWr); + if (INVALID_HANDLE_VALUE != taskStdOutRd) CloseHandle(taskStdOutRd); + if (INVALID_HANDLE_VALUE != taskStdOutWr) CloseHandle(taskStdOutWr); + if (INVALID_HANDLE_VALUE != taskStdErrRd) CloseHandle(taskStdErrRd); + if (INVALID_HANDLE_VALUE != taskStdErrWr) CloseHandle(taskStdErrWr); + + + // This is closing our own process/thread handles. + // If the transfer was succesfull the NM has its own duplicates (if any) + if (INVALID_HANDLE_VALUE != pi.hThread) CloseHandle(pi.hThread); + if (INVALID_HANDLE_VALUE != pi.hProcess) CloseHandle(pi.hProcess); + + return dwError; +} + +void ServiceUsage() +{ + fwprintf(stdout, L"\ + Usage: service\n\ + Starts the impersonation helper service.\n\ + This should be called from the SCM.\n\ + The impersonation helper service must run as a high privileged account (LocalSystem)\n\ + and is used by the NodeManager to spawn secure containers.\n"); +} + + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/task.c b/hadoop-common-project/hadoop-common/src/main/winutils/task.c index 81f2e4f..1e87a29 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/task.c +++ b/hadoop-common-project/hadoop-common/src/main/winutils/task.c @@ -19,10 +19,18 @@ #include #include #include +#include #define PSAPI_VERSION 1 #pragma comment(lib, "psapi.lib") + +#define NM_WSCE_IMPERSONATE_ALLOWED L"yarn.nodemanager.windows-secure-container-executor.impersonate.allowed" +#define NM_WSCE_IMPERSONATE_DENIED L"yarn.nodemanager.windows-secure-container-executor.impersonate.denied" + +// The S4U impersonation access check mask. Arbitrary value (we use 1 for the service access check) +#define SERVICE_IMPERSONATE_MASK 0x00000002 + #define ERROR_TASK_NOT_ALIVE 1 // This exit code for killed processes is compatible with Unix, where a killed @@ -104,6 +112,221 @@ static BOOL ParseCommandLine(__in int argc, return FALSE; } + +//---------------------------------------------------------------------------- +// Function: BuildImpersonateSecurityDescriptor +// +// Description: +// Builds the security descriptor for the S4U impersonation permissions +// This describes what users can be impersonated and what not +// +// Returns: +// ERROR_SUCCESS: On success +// GetLastError: otherwise +// +DWORD BuildImpersonateSecurityDescriptor(__out PSECURITY_DESCRIPTOR* ppSD) { + DWORD dwError = ERROR_SUCCESS; + size_t countAllowed = 0; + PSID* allowedSids = NULL; + size_t countDenied = 0; + PSID* deniedSids = NULL; + LPCWSTR value = NULL; + WCHAR** tokens = NULL; + size_t len = 0; + size_t count = 0; + int crt = 0; + PSECURITY_DESCRIPTOR pSD = NULL; + + dwError = GetConfigValue(NM_WSCE_IMPERSONATE_ALLOWED, &len, &value); + if (dwError) { + ReportErrorCode(L"GetConfigValue:1", dwError); + goto done; + } + + if (0 == len) { + dwError = ERROR_BAD_CONFIGURATION; + ReportErrorCode(L"GetConfigValue:2", dwError); + goto done; + } + + dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens); + if (dwError) { + ReportErrorCode(L"SplitStringIgnoreSpaceW:1", dwError); + goto done; + } + + allowedSids = LocalAlloc(LPTR, sizeof(PSID) * count); + if (NULL == allowedSids) { + dwError = GetLastError(); + ReportErrorCode(L"LocalAlloc:1", dwError); + goto done; + } + + for(crt = 0; crt < count; ++crt) { + dwError = GetSidFromAcctNameW(tokens[crt], &allowedSids[crt]); + if (dwError) { + ReportErrorCode(L"GetSidFromAcctNameW:1", dwError); + goto done; + } + } + countAllowed = count; + + LocalFree(tokens); + tokens = NULL; + + LocalFree(value); + value = NULL; + + dwError = GetConfigValue(NM_WSCE_IMPERSONATE_DENIED, &len, &value); + if (dwError) { + ReportErrorCode(L"GetConfigValue:3", dwError); + goto done; + } + + if (0 != len) { + dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens); + if (dwError) { + ReportErrorCode(L"SplitStringIgnoreSpaceW:2", dwError); + goto done; + } + + deniedSids = LocalAlloc(LPTR, sizeof(PSID) * count); + if (NULL == allowedSids) { + dwError = GetLastError(); + ReportErrorCode(L"LocalAlloc:2", dwError); + goto done; + } + + for(crt = 0; crt < count; ++crt) { + dwError = GetSidFromAcctNameW(tokens[crt], &deniedSids[crt]); + if (dwError) { + ReportErrorCode(L"GetSidFromAcctNameW:2", dwError); + goto done; + } + } + countDenied = count; + } + + dwError = BuildServiceSecurityDescriptor( + SERVICE_IMPERSONATE_MASK, + countAllowed, allowedSids, + countDenied, deniedSids, + &pSD); + + if (dwError) { + ReportErrorCode(L"BuildServiceSecurityDescriptor", dwError); + goto done; + } + + *ppSD = pSD; + pSD = NULL; + +done: + if (pSD) LocalFree(pSD); + if (tokens) LocalFree(tokens); + if (allowedSids) LocalFree(allowedSids); + if (deniedSids) LocalFree(deniedSids); + return dwError; +} + +//---------------------------------------------------------------------------- +// Function: ValidateImpersonateAccessCheck +// +// Description: +// Performs the access check for S4U impersonation +// +// Returns: +// ERROR_SUCCESS: On success +// ERROR_ACCESS_DENIED, GetLastError: otherwise +// +DWORD ValidateImpersonateAccessCheck(__in HANDLE logonHandle) { + DWORD dwError = ERROR_SUCCESS; + PSECURITY_DESCRIPTOR pSD = NULL; + LUID luidUnused; + AUTHZ_ACCESS_REQUEST request; + AUTHZ_ACCESS_REPLY reply; + DWORD authError = ERROR_SUCCESS; + DWORD saclResult = 0; + ACCESS_MASK grantedMask = 0; + AUTHZ_RESOURCE_MANAGER_HANDLE hManager = NULL; + AUTHZ_CLIENT_CONTEXT_HANDLE hAuthzToken = NULL; + + ZeroMemory(&luidUnused, sizeof(luidUnused)); + ZeroMemory(&request, sizeof(request)); + ZeroMemory(&reply, sizeof(reply)); + + dwError = BuildImpersonateSecurityDescriptor(&pSD); + if (dwError) { + ReportErrorCode(L"BuildImpersonateSecurityDescriptor", dwError); + goto done; + } + + request.DesiredAccess = MAXIMUM_ALLOWED; + reply.Error = &authError; + reply.SaclEvaluationResults = &saclResult; + reply.ResultListLength = 1; + reply.GrantedAccessMask = &grantedMask; + + if (!AuthzInitializeResourceManager( + AUTHZ_RM_FLAG_NO_AUDIT, + NULL, // pfnAccessCheck + NULL, // pfnComputeDynamicGroups + NULL, // pfnFreeDynamicGroups + NULL, // szResourceManagerName + &hManager)) { + dwError = GetLastError(); + ReportErrorCode(L"AuthzInitializeResourceManager", dwError); + goto done; + } + + if (!AuthzInitializeContextFromToken( + 0, + logonHandle, + hManager, + NULL, // expiration time + luidUnused, // not used + NULL, // callback args + &hAuthzToken)) { + dwError = GetLastError(); + ReportErrorCode(L"AuthzInitializeContextFromToken", dwError); + goto done; + } + + if (!AuthzAccessCheck( + 0, + hAuthzToken, + &request, + NULL, // AuditEvent + pSD, + NULL, // OptionalSecurityDescriptorArray + 0, // OptionalSecurityDescriptorCount + &reply, + NULL // phAccessCheckResults + )) { + dwError = GetLastError(); + ReportErrorCode(L"AuthzAccessCheck", dwError); + goto done; + } + + LogDebugMessage(L"AutzAccessCheck: Error:%d sacl:%d access:%d\n", + authError, saclResult, grantedMask); + + if (authError != ERROR_SUCCESS) { + ReportErrorCode(L"AuthzAccessCheck:REPLY:1", authError); + dwError = authError; + } + else if (!(grantedMask & SERVICE_IMPERSONATE_MASK)) { + ReportErrorCode(L"AuthzAccessCheck:REPLY:2", ERROR_ACCESS_DENIED); + dwError = ERROR_ACCESS_DENIED; + } + +done: + if (hAuthzToken) AuthzFreeContext(hAuthzToken); + if (hManager) AuthzFreeResourceManager(hManager); + if (pSD) LocalFree(pSD); + return dwError; +} + //---------------------------------------------------------------------------- // Function: CreateTaskImpl // @@ -131,6 +354,13 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PW wchar_t* curr_dir = NULL; FILE *stream = NULL; + if (NULL != logonHandle) { + dwErrorCode = ValidateImpersonateAccessCheck(logonHandle); + if (dwErrorCode) { + return dwErrorCode; + } + } + // Create un-inheritable job object handle and set job object to terminate // when last handle is closed. So winutils.exe invocation has the only open // job object handle. Exit of winutils.exe ensures termination of job object. @@ -305,7 +535,8 @@ DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD CreateTaskAsUser(__in PCWSTR jobObjName,__in PWSTR user, __in PWSTR pidFilePath, __in PWSTR cmdLine) +DWORD CreateTaskAsUser(__in PCWSTR jobObjName, + __in PCWSTR user, __in PCWSTR pidFilePath, __in PCWSTR cmdLine) { DWORD err = ERROR_SUCCESS; DWORD exitCode = EXIT_FAILURE; diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc new file mode 100644 index 0000000..8a69504 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc @@ -0,0 +1,192 @@ +; // winutils.mc + +; // EventLog messages for Hadoop winutils service. + + +LanguageNames=(English=0x409:MSG00409) + + +; // The following are the categories of events. + +MessageIdTypedef=WORD + +MessageId=0x1 +SymbolicName=SERVICE_CATEGORY +Language=English +Service Events +. + +MessageId=0x2 +SymbolicName=RPC_CATEGORY +Language=English +RPC Events +. + +MessageId=0x3 +SymbolicName=TASK_CATEGORY +Language=English +Task Events +. + +MessageId=0x4 +SymbolicName=LOG_CATEGORY +Language=English +Task Events +. + +; // The following are the message definitions. + +MessageIdTypedef=DWORD + +MessageId=0x80 +SymbolicName=MSG_CHECK_ERROR +Language=English +%1. Error %2: %3. +. + +MessageId=0x100 +SymbolicName=MSG_SVC_REGISTER_EVENT_SOURCE_FAILED +Language=English +Failed to register the event source. Error %1: %2. +. + +MessageId=0x101 +SymbolicName=MSG_SVC_START_CTRL_DISPATCHER_FAILED +Language=English +Failed to start the service control dispatcher. Error %1: %2. +. + +MessageId=0x102 +SymbolicName=MSG_SVC_REGISTER_SERVICE_CTRL_HANDLER_FAILED +Language=English +Failed to register the service control handler. Error %1: %2. +. + +MessageId=0x103 +SymbolicName=MSG_SVC_CREATE_STOP_EVENT_FAILED +Language=English +Failed to create the service stop event. Error %1: %2. +. + +MessageId=0x104 +SymbolicName=MSG_SVC_REGISTER_STOP_WAIT_FUNCTION_FAILED +Language=English +Failed to register the callback for stop event. Error %1: %2. +. + +MessageId=0x105 +SymbolicName=MSG_SVC_STATUS_FAILED +Language=English +Failed to set the service status. Error %1: %2. +. + +MessageId=0x201 +SymbolicName=MSG_RPC_USE_PROT_SEQ_FAILED +Language=English +Failed to use protocol sequence and endpoint. Error %1: %2. +. + +MessageId=0x202 +SymbolicName=MSG_RPC_REGISTER_IF_FAILED +Language=English +Failed to use protocol sequence and endpoint. Error %1: %2. +. + +MessageId=0x203 +SymbolicName=MSG_RPC_LISTEN_FAILED +Language=English +Failed to listen for the LPC endpoint. Error %1: %2. +. + +MessageId=0x204 +SymbolicName=MSG_RPC_CREATE_TASK_AS_USER +Language=English +CreateTaskAsUser: cwd:%1 jobName=%2 user=%3 pidFile=%4 cmdLine=%5. +. + +MessageId=0x205 +SymbolicName=MSG_RPC_CREATE_TASK_AS_USER_LOG_FAILED +Language=English +Failed to report verbose CreateTaskAsUser call. Error %1: %2. +. + +MessageId=0x206 +SymbolicName=MSG_RPC_STOP_SERVER_FAILED +Language=English +Failed to stop the LPC server listenning. Error %1: %2. +. + +MessageId=0x207 +SymbolicName=MSG_RPC_STOP_WAIT_LISTEN_FAILED +Language=English +Failed to wait for all LPC calls to complete. Error %1: %2. +. + +MessageId=0x208 +SymbolicName=MSG_RPC_SERVICE_HAS_STARTED +Language=English +The LPC server is listenning. +. + +MessageId=0x209 +SymbolicName=MSG_RPC_SERVICE_HAS_STOPPED +Language=English +The LPC server has stopped listenning. +. + +MessageId=0x20a +SymbolicName=MSG_RPC_AUTHENTICATE_CLIENT +Language=English +RpcAuthorizeCallback called. +. + +MessageId=0x20b +SymbolicName=MSG_RPC_ALREADY_LISTENNING +Language=English +RpcServerListen reported RPC_S_ALREADY_LISTENING. +. + + +MessageId=0x301 +SymbolicName=MSG_TASK_CREATE_PROCESS_FAILED +Language=English +CreateProcess error: %1 %2. +. + +MessageId=0x302 +SymbolicName=MSG_TASK_GET_EXIT_CODE_FAILED +Language=English +GetExitCodeProcess error: %1 %2. +. + +MessageId=0x303 +SymbolicName=MSG_TASK_GET_MODULE_FILENAME_FAILED +Language=English +GetModuleFilename error: %1 %2. +. + +MessageId=0x304 +SymbolicName=MSG_TASK_BUILD_FULL_CMDLINE_FAILED +Language=English +StringCbPrintf error: %1 %2. +. + +MessageId=0x305 +SymbolicName=MSG_TASK_READ_PIPE_FAILED +Language=English +ReadFile error: %1 %2. +. + +MessageId=0x306 +SymbolicName=MSG_TASK_OPEN_NM_PROCESS +Language=English +OpenProcess error: %1 %2. +. + +MessageId=0x401 +SymbolicName=MSG_LOG_MESSAGE +Language=English +%1. +. + + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj index 5b9a195..f216b71 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj +++ b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj @@ -1,5 +1,4 @@  - - @@ -86,11 +84,6 @@ true - - true - - ..\..\..\target\winutils\$(Configuration)\ - false @@ -124,6 +117,10 @@ Console true + + X64 + ..\..\..\target\winutils\$(Configuration)\ + @@ -159,7 +156,40 @@ true + + + $(IntermediateOutputPath) + + + Compiling Messages + mc.exe $(TargetName).mc -z $(TargetName)_msg -r $(IntermediateOutputPath) -h $(IntermediateOutputPath) -U + $(IntermediateOutputPath)\$(TargetName)_msg.rc,$(IntermediateOutputPath)\$(TargetName)_msg.h + + + true + X64 + $(IntermediateOutputPath) + true + true + true + 2 + + + + Midl + ClCompile,ResourceCompile + + + + + + + + + + + @@ -179,4 +209,4 @@ - \ No newline at end of file + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java index 2f8b84d..1e2d16e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java @@ -296,7 +296,7 @@ public static boolean isAlive(String pid) { return false; } catch (IOException ioe) { LOG.warn("Error executing shell command " - + Arrays.toString(shexec.getExecString()) + ioe); + + shexec.toString() + ioe); return false; } return (shexec.getExitCode() == 0 ? true : false); @@ -321,7 +321,7 @@ public static boolean isProcessGroupAlive(String pgrpId) { return false; } catch (IOException ioe) { LOG.warn("Error executing shell command " - + Arrays.toString(shexec.getExecString()) + ioe); + + shexec.toString() + ioe); return false; } return (shexec.getExitCode() == 0 ? true : false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 54866bb..b72bbcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -30,9 +30,11 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -40,6 +42,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ExitCodeException; +import org.apache.hadoop.util.Shell.ICommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -182,20 +185,16 @@ public int launchContainer(Container container, // create log dir under app // fork script - ShellCommandExecutor shExec = null; + Shell.ICommandExecutor shExec = null; try { setScriptExecutable(launchDst, userName); setScriptExecutable(sb.getWrapperScriptPath(), userName); - // Setup command to run - String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), - containerIdStr, userName, pidFile, this.getConf()); - - LOG.info("launchContainer: " + Arrays.toString(command)); - shExec = new ShellCommandExecutor( - command, + shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(), + containerIdStr, userName, pidFile, this.getConf(), new File(containerWorkDir.toUri().getPath()), - container.getLaunchContext().getEnvironment()); // sanitized env + container.getLaunchContext().getEnvironment()); + if (isContainerActive(containerId)) { shExec.execute(); } @@ -229,11 +228,25 @@ public int launchContainer(Container container, } return exitCode; } finally { - ; // + shExec.dispose(); // } return 0; } + protected ICommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr, + String userName, Path pidFile, Configuration conf, File wordDir, Map environment) + throws IOException { + + String[] command = getRunCommand(wrapperScriptPath, + containerIdStr, userName, pidFile, this.getConf()); + + LOG.info("launchContainer: " + Arrays.toString(command)); + return new ShellCommandExecutor( + command, + wordDir, + environment); + } + protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( String containerIdStr, Path containerWorkDir) { return Shell.WINDOWS ? diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java index 30beaf8..672cc36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java @@ -17,25 +17,33 @@ */ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.PrintStream; +import java.io.Reader; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.WinutilsProcessStub; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ICommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.LocalWrapperScriptBuilder; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; /** @@ -43,7 +51,7 @@ * */ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor { - + private static final Log LOG = LogFactory .getLog(WindowsSecureContainerExecutor.class); @@ -59,6 +67,115 @@ protected void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout.format("@call \"%s\"", launchDst); } } + + private class WintuilsProcessStubExecutor implements Shell.ICommandExecutor { + private WinutilsProcessStub processStub; + private StringBuilder output = new StringBuilder(); + private int exitCode; + private boolean complete = false; + + private final String cwd; + private final String jobName; + private final String userName; + private final String pidFile; + private final String cmdLine; + + public WintuilsProcessStubExecutor(WinutilsProcessStub processStub) { + this.processStub = processStub; + cwd = jobName = userName = pidFile = cmdLine = null; + } + + public WintuilsProcessStubExecutor( + String cwd, + String jobName, + String userName, + String pidFile, + String cmdLine) { + this.cwd = cwd; + this.jobName = jobName; + this.userName = userName; + this.pidFile = pidFile; + this.cmdLine = cmdLine; + } + + private void assumeComplete() throws IOException { + if (!complete) { + throw new IOException("Process is not complete"); + } + } + + public String getOutput () throws IOException { + assumeComplete(); + return output.toString(); + } + + public int getExitCode() throws IOException { + assumeComplete(); + return exitCode; + } + + public void validateResult() throws IOException { + assumeComplete(); + if (0 != exitCode) { + LOG.warn(output.toString()); + throw new IOException("Processs exit code is:" + exitCode); + } + } + + private Thread startStreamReader(final InputStream stream) throws IOException { + Thread streamReaderThread = new Thread() { + + @Override + public void run() { + try + { + BufferedReader rdr = new BufferedReader( + new InputStreamReader(stream)); + + String line = rdr.readLine(); + while((line != null) && !isInterrupted()) { + synchronized(output) { + output.append(line); + output.append(System.getProperty("line.separator")); + } + line = rdr.readLine(); + } + } + catch(Throwable t) { + LOG.error("Error occured reading the process stdout", t); + } + } + }; + streamReaderThread.start(); + return streamReaderThread; + } + + public void execute() throws IOException { + if (null == processStub) { + processStub = NativeIO.createTaskAsUser(cwd, jobName, userName, pidFile, cmdLine); + } + Thread stdOutReader = startStreamReader(processStub.getInputStream()); + Thread stdErrReader = startStreamReader(processStub.getErrorStream()); + + try { + processStub.resume(); + processStub.waitFor(); + stdOutReader.join(); + stdErrReader.join(); + } + catch(InterruptedException ie) { + throw new IOException(ie); + } + + exitCode = processStub.exitValue(); + complete = true; + } + + @Override + public void dispose() { + processStub.dispose(); + } + } private String nodeManagerGroup; @@ -133,13 +250,6 @@ public void startLocalizer(Path nmPrivateContainerTokens, command = new ArrayList(); - command.add(Shell.WINUTILS); - command.add("task"); - command.add("createAsUser"); - command.add("START_LOCALIZER_" + locId); - command.add(user); - command.add("nul:"); // PID file - //use same jvm as parent File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java.exe"); command.add(jvm.toString()); @@ -160,12 +270,35 @@ public void startLocalizer(Path nmPrivateContainerTokens, } ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr, localDirs); - commandArray = command.toArray(new String[command.size()]); - - shExec = new ShellCommandExecutor( - commandArray, cwdApp); - - shExec.execute(); + + String cmdLine = StringUtils.join(command, " "); + + WinutilsProcessStub processStub = NativeIO.createTaskAsUser(cwdApp.getAbsolutePath(), + "START_LOCALIZER_" + locId, user, "nul:", cmdLine); + try { + WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor(processStub); + stubExecutor.execute(); + stubExecutor.validateResult(); + } + finally { + processStub.dispose(); + } } + + @Override + protected ICommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr, + String userName, Path pidFile, Configuration conf, File wordDir, Map environment) throws IOException { + + String[] command = getRunCommand(wrapperScriptPath, + containerIdStr, userName, pidFile, this.getConf()); + + LOG.info("launchContainer (hdpwinutils): " + Arrays.toString(command)); + + return new WintuilsProcessStubExecutor( + wordDir.toString(), + containerIdStr, userName, pidFile.toString(), "cmd /c " + wrapperScriptPath); + } } + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 3525170..762565b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -369,10 +369,13 @@ public static void main(String[] argv) throws Throwable { new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, appId, locId, localDirs, RecordFactoryProvider.getRecordFactory(null)); - System.exit(localizer.runLocalization(nmAddr)); + int nRet = localizer.runLocalization(nmAddr); + LOG.info(String.format("nRet: %d", nRet)); + System.exit(nRet); } catch (Throwable e) { // Print error to stdout so that LCE can use it. e.printStackTrace(System.out); + LOG.error("Exception in main:", e); throw e; } }