diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 976a93f..a7f5b9c 100644
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -17,11 +17,14 @@
*/
package org.apache.hadoop.io.nativeio;
+import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
@@ -833,4 +836,71 @@ public static void renameTo(File src, File dst)
*/
private static native void renameTo0(String src, String dst)
throws NativeIOException;
+
+ /**
+ * Wraps a process started by the winutils service helper.
+ *
+ */
+ public static class WinutilsProcessStub extends Process {
+
+ private final long hProcess;
+ private final long hThread;
+ private boolean disposed = false;
+
+ private final InputStream stdErr;
+ private final InputStream stdOut;
+ private final OutputStream stdIn;
+
+ public WinutilsProcessStub(long hProcess, long hThread, long hStdIn, long hStdOut, long hStdErr) {
+ this.hProcess = hProcess;
+ this.hThread = hThread;
+
+ this.stdIn = new FileOutputStream(getFileDescriptorFromHandle(hStdIn));
+ this.stdOut = new FileInputStream(getFileDescriptorFromHandle(hStdOut));
+ this.stdErr = new FileInputStream(getFileDescriptorFromHandle(hStdErr));
+ }
+
+ private static native FileDescriptor getFileDescriptorFromHandle(long handle);
+
+ @Override
+ public native void destroy();
+
+ @Override
+ public native int exitValue();
+
+ @Override
+ public InputStream getErrorStream() {
+ return stdErr;
+ }
+ @Override
+ public InputStream getInputStream() {
+ return stdOut;
+ }
+ @Override
+ public OutputStream getOutputStream() {
+ return stdIn;
+ }
+ @Override
+ public native int waitFor() throws InterruptedException;
+
+ public synchronized native void dispose();
+
+ public native void resume() throws NativeIOException;
+ }
+
+ public synchronized static WinutilsProcessStub createTaskAsUser(
+ String cwd, String jobName, String user, String pidFile, String cmdLine)
+ throws IOException {
+ if (!nativeLoaded) {
+ throw new IOException("NativeIO libraries are required for createTaskAsUser");
+ }
+ synchronized(Shell.WindowsProcessLaunchLock) {
+ return createTaskAsUser0(cwd, jobName, user, pidFile, cmdLine);
+ }
+ }
+
+ private static native WinutilsProcessStub createTaskAsUser0(
+ String cwd, String jobName, String user, String pidFile, String cmdLine)
+ throws NativeIOException;
+
}
diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index fcdc021..67297cd 100644
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -643,6 +643,18 @@ public String toString() {
}
}
+ public interface ICommandExecutor {
+
+ void execute() throws IOException;
+
+ int getExitCode() throws IOException;
+
+ String getOutput() throws IOException;
+
+ void dispose();
+
+ }
+
/**
* A simple shell command executor.
*
@@ -651,7 +663,7 @@ public String toString() {
* directory and the environment remains unchanged. The output of the command
* is stored as-is and is expected to be small.
*/
- public static class ShellCommandExecutor extends Shell {
+ public static class ShellCommandExecutor extends Shell implements ICommandExecutor {
private String[] command;
private StringBuffer output;
@@ -743,6 +755,10 @@ public String toString() {
}
return builder.toString();
}
+
+ @Override
+ public void dispose() {
+ }
}
/**
diff --git hadoop-common-project/hadoop-common/src/main/native/native.vcxproj hadoop-common-project/hadoop-common/src/main/native/native.vcxproj
index 0d67e1e..e743788 100644
--- hadoop-common-project/hadoop-common/src/main/native/native.vcxproj
+++ hadoop-common-project/hadoop-common/src/main/native/native.vcxproj
@@ -99,6 +99,7 @@
+
diff --git hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index 95bb987..022c8c8 100644
--- hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -49,6 +49,7 @@
#include "file_descriptor.h"
#include "errno_enum.h"
+#include "winutils_process_stub.h"
#define MMAP_PROT_READ org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_READ
#define MMAP_PROT_WRITE org_apache_hadoop_io_nativeio_NativeIO_POSIX_MMAP_PROT_WRITE
@@ -68,8 +69,13 @@ static jmethodID nioe_ctor;
// Please see HADOOP-7156 for details.
jobject pw_lock_object;
+/*
+ * Throw a java.IO.IOException, generating the message from errno.
+ * NB. this is also used form winutils_process_stub.c
+ */
+extern void throw_ioe(JNIEnv* env, int errnum);
+
// Internal functions
-static void throw_ioe(JNIEnv* env, int errnum);
#ifdef UNIX
static ssize_t get_pw_buflen();
#endif
@@ -191,6 +197,12 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
errno_enum_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
#endif
+
+#ifdef WINDOWS
+ winutils_process_stub_init(env);
+ PASS_EXCEPTIONS_GOTO(env, error);
+#endif
+
return;
error:
// these are all idempodent and safe to call even if the
@@ -203,6 +215,9 @@ error:
#ifdef UNIX
errno_enum_deinit(env);
#endif
+#ifdef WINDOWS
+ winutils_process_stub_deinit(env);
+#endif
}
/*
@@ -766,7 +781,7 @@ cleanup:
/*
* Throw a java.IO.IOException, generating the message from errno.
*/
-static void throw_ioe(JNIEnv* env, int errnum)
+void throw_ioe(JNIEnv* env, int errnum)
{
#ifdef UNIX
char message[80];
@@ -1072,6 +1087,85 @@ JNIEnv *env, jclass clazz)
#endif
}
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO
+ * Method: createTaskAsUser
+ * Signature: (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String)Lorg/apache/hadoop/io/nativeio/NativeIO$WinutilsProcessStub
+ */
+JNIEXPORT jobject JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_createTaskAsUser0(JNIEnv* env,
+ jclass clazz, jstring cwd, jstring jobName, jstring user, jstring pidFile, jstring cmdLine) {
+#ifdef UNIX
+ THROW(env, "java/io/IOException",
+ "The function createTaskAsUser is not supported on Unix");
+ return -1;
+#endif
+
+#ifdef WINDOWS
+ LPCWSTR lpszCwd = NULL, lpszJobName = NULL,
+ lpszUser = NULL, lpszPidFile = NULL, lpszCmdLine = NULL;
+ DWORD dwError = ERROR_SUCCESS;
+ HANDLE hProcess = INVALID_HANDLE_VALUE,
+ hThread = INVALID_HANDLE_VALUE,
+ hStdIn = INVALID_HANDLE_VALUE,
+ hStdOut = INVALID_HANDLE_VALUE,
+ hStdErr = INVALID_HANDLE_VALUE;
+ jobject ret = NULL;
+
+ lpszCwd = (LPCWSTR) (*env)->GetStringChars(env, cwd, NULL);
+ if (!lpszCwd) goto done; // exception was thrown
+
+ lpszJobName = (LPCWSTR) (*env)->GetStringChars(env, jobName, NULL);
+ if (!lpszJobName) goto done; // exception was thrown
+
+ lpszUser = (LPCWSTR) (*env)->GetStringChars(env, user, NULL);
+ if (!lpszUser) goto done; // exception was thrown
+
+ lpszPidFile = (LPCWSTR) (*env)->GetStringChars(env, pidFile, NULL);
+ if (!lpszPidFile) goto done; // exception was thrown
+
+ lpszCmdLine = (LPCWSTR) (*env)->GetStringChars(env, cmdLine, NULL);
+ if (!lpszCmdLine) goto done; // exception was thrown
+
+ LogDebugMessage(L"createTaskAsUser: cwd:%s job:%s user:%s pid:%s cmd:%s\n",
+ lpszCwd, lpszJobName, lpszUser, lpszPidFile, lpszCmdLine);
+
+ dwError = RpcCall_TaskCreateAsUser(lpszCwd, lpszJobName, lpszUser, lpszPidFile, lpszCmdLine,
+ &hProcess, &hThread, &hStdIn, &hStdOut, &hStdErr);
+
+ if (ERROR_SUCCESS == dwError) {
+ ret = winutils_process_stub_create(env, (jlong) hProcess, (jlong) hThread,
+ (jlong) hStdIn, (jlong) hStdOut, (jlong) hStdErr);
+
+ if (NULL == ret) {
+ TerminateProcess(hProcess, EXIT_FAILURE);
+ CloseHandle(hThread);
+ CloseHandle(hProcess);
+ CloseHandle(hStdIn);
+ CloseHandle(hStdOut);
+ CloseHandle(hStdErr);
+ }
+ }
+
+done:
+
+ if (lpszCwd) (*env)->ReleaseStringChars(env, cwd, lpszCwd);
+ if (lpszJobName) (*env)->ReleaseStringChars(env, jobName, lpszJobName);
+ if (lpszUser) (*env)->ReleaseStringChars(env, user, lpszUser);
+ if (lpszPidFile) (*env)->ReleaseStringChars(env, pidFile, lpszPidFile);
+ if (lpszCmdLine) (*env)->ReleaseStringChars(env, cmdLine, lpszCmdLine);
+
+ if (dwError != ERROR_SUCCESS) {
+ throw_ioe (env, dwError);
+ }
+
+ return ret;
+
+#endif
+}
+
+
/**
* vim: sw=2: ts=2: et:
*/
diff --git hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c
new file mode 100644
index 0000000..d8afcca
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.c
@@ -0,0 +1,189 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with this
+* work for additional information regarding copyright ownership. The ASF
+* licenses this file to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+#include
+#include "org_apache_hadoop.h"
+#include "winutils_process_stub.h"
+#include "winutils.h"
+#include "file_descriptor.h"
+
+// class of org.apache.hadoop.io.nativeio.NativeIO.WinutilsProcessStub
+static jclass wps_class = NULL;
+
+
+static jmethodID wps_constructor = NULL;
+static jfieldID wps_hProcess = NULL;
+static jfieldID wps_hThread = NULL;
+static jfieldID wps_disposed = NULL;
+
+extern void throw_ioe(JNIEnv* env, int errnum);
+
+void winutils_process_stub_init(JNIEnv *env) {
+ if (wps_class != NULL) return; // already initted
+
+ wps_class = (*env)->FindClass(env, WINUTILS_PROCESS_STUB_CLASS);
+ PASS_EXCEPTIONS(env);
+ wps_class = (*env)->NewGlobalRef(env, wps_class);
+
+ wps_hProcess = (*env)->GetFieldID(env, wps_class, "hProcess", "J");
+ PASS_EXCEPTIONS(env);
+
+ wps_hThread = (*env)->GetFieldID(env, wps_class, "hThread", "J");
+ PASS_EXCEPTIONS(env);
+
+ wps_disposed = (*env)->GetFieldID(env, wps_class, "disposed", "Z");
+ PASS_EXCEPTIONS(env);
+
+ wps_constructor = (*env)->GetMethodID(env, wps_class, "", "(JJJJJ)V");
+
+ LogDebugMessage(L"winutils_process_stub_init\n");
+}
+
+void winutils_process_stub_deinit(JNIEnv *env) {
+ if (wps_class != NULL) {
+ (*env)->DeleteGlobalRef(env, wps_class);
+ wps_class = NULL;
+ }
+ wps_hProcess = NULL;
+ wps_hThread = NULL;
+ wps_disposed = NULL;
+ wps_constructor = NULL;
+ LogDebugMessage(L"winutils_process_stub_deinit\n");
+}
+
+jobject winutils_process_stub_create(JNIEnv *env,
+ jlong hProcess, jlong hThread, jlong hStdIn, jlong hStdOut, jlong hStdErr) {
+ jobject obj = (*env)->NewObject(env, wps_class, wps_constructor,
+ hProcess, hThread, hStdIn, hStdOut, hStdErr);
+ PASS_EXCEPTIONS_RET(env, NULL);
+
+ LogDebugMessage(L"winutils_process_stub_create: %p\n", obj);
+
+ return obj;
+}
+
+
+/*
+ * native void destroy();
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_destroy(
+ JNIEnv *env, jobject objSelf) {
+
+ HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess);
+ LogDebugMessage(L"TerminateProcess: %x\n", hProcess);
+ TerminateProcess(hProcess, EXIT_FAILURE);
+}
+
+/*
+ * native void waitFor();
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_waitFor(
+ JNIEnv *env, jobject objSelf) {
+
+ HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess);
+ LogDebugMessage(L"WaitForSingleObject: %x\n", hProcess);
+ WaitForSingleObject(hProcess, INFINITE);
+}
+
+
+
+/*
+ * native void resume();
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_resume(
+ JNIEnv *env, jobject objSelf) {
+
+ DWORD dwError;
+ HANDLE hThread = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hThread);
+ if (-1 == ResumeThread(hThread)) {
+ dwError = GetLastError();
+ LogDebugMessage(L"ResumeThread: %x error:%d\n", hThread, dwError);
+ throw_ioe(env, dwError);
+ }
+}
+
+/*
+ * native int exitValue();
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_exitValue(
+ JNIEnv *env, jobject objSelf) {
+
+ DWORD exitCode;
+ HANDLE hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess);
+ GetExitCodeProcess(hProcess, &exitCode);
+ LogDebugMessage(L"GetExitCodeProcess: %x :%d\n", hProcess, exitCode);
+
+ return exitCode;
+}
+
+
+/*
+ * native void dispose();
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_dispose(
+ JNIEnv *env, jobject objSelf) {
+
+ HANDLE hProcess, hThread;
+
+ jboolean disposed = (*env)->GetBooleanField(env, objSelf, wps_disposed);
+
+ if (JNI_TRUE != disposed) {
+ hProcess = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hProcess);
+ hThread = (HANDLE)(*env)->GetLongField(env, objSelf, wps_hThread);
+
+ CloseHandle(hProcess);
+ CloseHandle(hThread);
+ (*env)->SetBooleanField(env, objSelf, wps_disposed, JNI_TRUE);
+ LogDebugMessage(L"disposed: %p\n", objSelf);
+ }
+}
+
+
+/*
+ * native static FileDescriptor getFileDescriptorFromHandle(long handle);
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT jobject JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024WinutilsProcessStub_getFileDescriptorFromHandle(
+ JNIEnv *env, jclass klass, jlong handle) {
+
+ LogDebugMessage(L"getFileDescriptorFromHandle: %x\n", handle);
+ return fd_create(env, (long) handle);
+}
+
diff --git hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h
new file mode 100644
index 0000000..6ab8ad6
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/winutils_process_stub.h
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+
+#define WINUTILS_PROCESS_STUB_CLASS "org/apache/hadoop/io/nativeio/NativeIO$WinutilsProcessStub"
+
+void winutils_process_stub_init(JNIEnv *env);
+void winutils_process_stub_deinit(JNIEnv *env);
+jobject winutils_process_stub_create(JNIEnv *env,
+ jlong hProcess, jlong hThread, jlong hStdIn, jlong hStdOut, jlong hStdErr);
+
+
diff --git hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h
index 92a6b27..3fd5a58 100644
--- hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h
+++ hadoop-common-project/hadoop-common/src/main/native/src/org_apache_hadoop.h
@@ -32,6 +32,7 @@
#define UNIX
#endif
+
/* A helper macro to 'throw' a java exception. */
#define THROW(env, exception_name, message) \
{ \
diff --git hadoop-common-project/hadoop-common/src/main/winutils/client.c hadoop-common-project/hadoop-common/src/main/winutils/client.c
new file mode 100644
index 0000000..3b8fda1
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/winutils/client.c
@@ -0,0 +1,162 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with this
+* work for additional information regarding copyright ownership. The ASF
+* licenses this file to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+#include "winutils.h"
+#include
+#include
+#include "hdpwinutilsvc_h.h"
+
+#pragma comment(lib, "Rpcrt4.lib")
+#pragma comment(lib, "advapi32.lib")
+
+static ACCESS_MASK CLIENT_MASK = 1;
+
+
+VOID ReportClientError(LPWSTR lpszLocation, DWORD dwError) {
+ LPWSTR debugMsg = NULL;
+ int len;
+ WCHAR hexError[32];
+ HRESULT hr;
+
+ if (IsDebuggerPresent()) {
+ len = FormatMessageW(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
+ NULL, dwError,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPWSTR)&debugMsg, 0, NULL);
+
+ LogDebugMessage(L"%s: %s: %x: %.*s\n", GetSystemTimeString(), lpszLocation, dwError, len, debugMsg);
+ }
+
+ if (NULL != debugMsg) LocalFree(debugMsg);
+}
+
+DWORD RpcCall_TaskCreateAsUser(
+ LPCWSTR cwd, LPCWSTR jobName,
+ LPCWSTR user, LPCWSTR pidFile, LPCWSTR cmdLine,
+ HANDLE* phProcess, HANDLE* phThread, HANDLE* phStdIn, HANDLE* phStdOut, HANDLE* phStdErr)
+{
+ DWORD dwError = EXIT_FAILURE;
+ RPC_STATUS status;
+ LPWSTR lpszStringBinding = NULL;
+ ULONG ulCode;
+ DWORD dwSelfPid = GetCurrentProcessId();
+ CREATE_PROCESS_REQUEST request;
+ CREATE_PROCESS_RESPONSE *response = NULL;
+ RPC_SECURITY_QOS_V3 qos;
+ PSID pLocalSystemSid = NULL;
+ SID_IDENTIFIER_AUTHORITY authNT = SECURITY_NT_AUTHORITY;
+
+ if (!AllocateAndInitializeSid(&authNT, 1,
+ SECURITY_LOCAL_SYSTEM_RID,
+ 0, 0, 0, 0, 0, 0, 0,
+ &pLocalSystemSid)) {
+ dwError = GetLastError();
+ ReportClientError(L"AllocateAndInitializeSid", dwError);
+ goto done;
+ }
+
+ ZeroMemory(&qos, sizeof(qos));
+ qos.Version = RPC_C_SECURITY_QOS_VERSION_3;
+ qos.Capabilities = RPC_C_QOS_CAPABILITIES_LOCAL_MA_HINT | RPC_C_QOS_CAPABILITIES_MUTUAL_AUTH;
+ qos.IdentityTracking = RPC_C_QOS_IDENTITY_DYNAMIC;
+ qos.ImpersonationType = RPC_C_IMP_LEVEL_DEFAULT;
+ qos.Sid = pLocalSystemSid;
+
+ ZeroMemory(&request, sizeof(request));
+ request.cwd = cwd;
+ request.jobName = jobName;
+ request.user = user;
+ request.pidFile = pidFile;
+ request.cmdLine = cmdLine;
+
+ status = RpcStringBindingCompose(NULL,
+ SVCBINDING,
+ NULL,
+ SVCNAME,
+ NULL,
+ &lpszStringBinding);
+ if (RPC_S_OK != status) {
+ ReportClientError(L"RpcStringBindingCompose", status);
+ dwError = status;
+ goto done;
+ }
+
+ status = RpcBindingFromStringBinding(lpszStringBinding, &hHdpWinutilsSvcBinding);
+
+ if (RPC_S_OK != status) {
+ ReportClientError(L"RpcBindingFromStringBinding", status);
+ dwError = status;
+ goto done;
+ }
+
+ status = RpcBindingSetAuthInfoEx(
+ hHdpWinutilsSvcBinding,
+ NULL,
+ RPC_C_AUTHN_LEVEL_PKT_PRIVACY, // AuthnLevel
+ RPC_C_AUTHN_WINNT, // AuthnSvc
+ NULL, // AuthnIdentity (self)
+ RPC_C_AUTHZ_NONE, // AuthzSvc
+ &qos);
+ if (RPC_S_OK != status) {
+ ReportClientError(L"RpcBindingSetAuthInfoEx", status);
+ dwError = status;
+ goto done;
+ }
+
+ RpcTryExcept {
+ dwError = WinutilsCreateProcessAsUser(dwSelfPid, &request, &response);
+ }
+ RpcExcept(1) {
+ ulCode = RpcExceptionCode();
+ ReportClientError(L"RpcExcept", ulCode);
+ dwError = (DWORD) ulCode;
+ }
+ RpcEndExcept;
+
+ if (ERROR_SUCCESS == dwError) {
+ *phProcess = response->hProcess;
+ *phThread = response->hThread;
+ *phStdIn = response->hStdIn;
+ *phStdOut = response->hStdOut;
+ *phStdErr = response->hStdErr;
+ }
+
+ // From here on forward we do no change dwError even on RPC cleanup errors
+ status = RpcBindingFree(&hHdpWinutilsSvcBinding);
+ if (RPC_S_OK != status) {
+ ReportClientError(L"RpcBindingFree", status);
+ goto done;
+ }
+
+done:
+ if (pLocalSystemSid) FreeSid(pLocalSystemSid);
+
+ if (NULL != response) {
+ MIDL_user_free(response);
+ }
+
+ if (NULL != lpszStringBinding) {
+ status = RpcStringFree(&lpszStringBinding);
+ if (RPC_S_OK != status) {
+ ReportClientError(L"RpcStringFree", status);
+ }
+ }
+
+ return dwError;
+}
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/config.cpp hadoop-common-project/hadoop-common/src/main/winutils/config.cpp
new file mode 100644
index 0000000..8524f7c
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/winutils/config.cpp
@@ -0,0 +1,133 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with this
+* work for additional information regarding copyright ownership. The ASF
+* licenses this file to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+#include "winutils.h"
+#include
+#import "msxml6.dll"
+
+
+#define YARN_SITE_XML_PATH L"%HADOOP_CONF_DIR%\\yarn-site.xml"
+#define YARN_DEFAULT_XML_PATH L"%HADOOP_CONF_DIR%\\yarn-default.xml"
+
+
+#define ERROR_CHECK_HRESULT_DONE(hr, message) \
+ if (FAILED(hr)) { \
+ dwError = (DWORD) hr; \
+ LogDebugMessage(L"%s: %x", message, hr); \
+ goto done; \
+ }
+
+DWORD GetConfigValue(__in LPCWSTR keyName,
+ __out size_t* len, __out_bcount(len) LPCWSTR* value) {
+
+ DWORD dwError = ERROR_SUCCESS;
+ WCHAR xmlPath[MAX_PATH];
+
+ *len = 0;
+ *value = NULL;
+
+ if (0 == ExpandEnvironmentStrings(YARN_SITE_XML_PATH, xmlPath, MAX_PATH)) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ dwError = GetConfigValueFromXmlFile(xmlPath, keyName, len, value);
+ if (*len) {
+ goto done;
+ }
+
+ if (0 == ExpandEnvironmentStrings(YARN_DEFAULT_XML_PATH, xmlPath, MAX_PATH)) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ dwError = GetConfigValueFromXmlFile(xmlPath, keyName, len, value);
+
+done:
+ if (*len) {
+ LogDebugMessage(L"GetConfigValue:%d key:%s len:%d value:%.*s from:%s\n", dwError, keyName, *len, *len, *value, xmlPath);
+ }
+ return dwError;
+}
+
+
+DWORD GetConfigValueFromXmlFile(__in LPCWSTR xmlFile, __in LPCWSTR keyName,
+ __out size_t* outLen, __out_bcount(len) LPCWSTR* outValue) {
+
+ DWORD dwError = ERROR_SUCCESS;
+ HRESULT hr;
+ WCHAR keyXsl[8192];
+ size_t len = 0;
+ LPWSTR value = NULL;
+
+ *outLen = 0;
+ *outValue = NULL;
+
+ hr = StringCbPrintf(keyXsl, sizeof(keyXsl), L"//configuration/property[name='%s']/value/text()", keyName);
+ ERROR_CHECK_HRESULT_DONE(hr, L"StringCbPrintf");
+
+ hr = CoInitialize(NULL);
+ ERROR_CHECK_HRESULT_DONE(hr, L"CoInitialize");
+
+ try {
+ MSXML2::IXMLDOMDocument2Ptr pDoc;
+ hr = pDoc.CreateInstance(__uuidof(MSXML2::DOMDocument60), NULL, CLSCTX_INPROC_SERVER);
+ ERROR_CHECK_HRESULT_DONE(hr, L"CreateInstance");
+
+ pDoc->async = VARIANT_FALSE;
+ pDoc->validateOnParse = VARIANT_FALSE;
+ pDoc->resolveExternals = VARIANT_FALSE;
+
+ _variant_t file(xmlFile);
+
+ if (VARIANT_FALSE == pDoc->load(file)) {
+ dwError = pDoc->parseError->errorCode;
+ LogDebugMessage(L"load %s failed:%d %s\n", xmlFile, dwError,
+ static_cast(pDoc->parseError->Getreason()));
+ goto done;
+ }
+
+ MSXML2::IXMLDOMElementPtr pRoot = pDoc->documentElement;
+ MSXML2::IXMLDOMNodePtr keyNode = pRoot->selectSingleNode(keyXsl);
+
+ if (keyNode) {
+ _bstr_t bstrValue = static_cast<_bstr_t>(keyNode->nodeValue);
+ len = bstrValue.length();
+ value = (LPWSTR) LocalAlloc(LPTR, (len+1) * sizeof(WCHAR));
+ LPCWSTR lpwszValue = static_cast(bstrValue);
+ memcpy(value, lpwszValue, (len) * sizeof(WCHAR));
+ LogDebugMessage(L"key:%s :%.*s [%s]\n", keyName, len, value, lpwszValue);
+ *outLen = len;
+ *outValue = value;
+ }
+ else {
+ LogDebugMessage(L"node Xpath:%s not found in:%s\n", keyXsl, xmlFile);
+ }
+ }
+ catch(_com_error errorObject) {
+ dwError = errorObject.Error();
+ LogDebugMessage(L"catch _com_error:%x %s\n", dwError, errorObject.ErrorMessage());
+ goto done;
+ }
+
+done:
+ CoUninitialize();
+
+ return dwError;
+}
+
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl
new file mode 100644
index 0000000..2d8c2b3
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/winutils/hdpwinutilsvc.idl
@@ -0,0 +1,35 @@
+import "oaidl.idl";
+import "ocidl.idl";
+
+[
+ uuid(0492311C-1718-4F53-A6EB-86AD7039988D),
+ version(1.0),
+ pointer_default(unique),
+ implicit_handle(handle_t hHdpWinutilsSvcBinding),
+ endpoint("ncalrpc:[hdpwinutilsvc]"),
+]
+interface Hdpwinutilsvc
+{
+ typedef struct {
+ [string] const wchar_t* cwd;
+ [string] const wchar_t* jobName;
+ [string] const wchar_t* user;
+ [string] const wchar_t* pidFile;
+ [string] const wchar_t* cmdLine;
+ } CREATE_PROCESS_REQUEST;
+
+ typedef struct {
+ LONG_PTR hProcess;
+ LONG_PTR hThread;
+ LONG_PTR hStdIn;
+ LONG_PTR hStdOut;
+ LONG_PTR hStdErr;
+ } CREATE_PROCESS_RESPONSE;
+
+
+ error_status_t WinutilsCreateProcessAsUser(
+ [in] int nmPid,
+ [in] CREATE_PROCESS_REQUEST *request,
+ [out] CREATE_PROCESS_RESPONSE **response);
+
+}
\ No newline at end of file
diff --git hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h
index bae754c..628d435 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h
+++ hadoop-common-project/hadoop-common/src/main/winutils/include/winutils.h
@@ -30,6 +30,10 @@
#include
#include
+#ifdef __cplusplus
+extern "C" {
+#endif
+
enum EXIT_CODE
{
/* Common success exit code shared among all utilities */
@@ -178,3 +182,47 @@ DWORD LoadUserProfileForLogon(__in HANDLE logonHandle, __out PROFILEINFO * pi);
DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi);
+DWORD RunService(__in int argc, __in_ecount(argc) wchar_t *argv[]);
+void ServiceUsage();
+
+LPCWSTR GetSystemTimeString();
+
+VOID LogDebugMessage(LPCWSTR format, ...);
+
+DWORD SplitStringIgnoreSpaceW(__in size_t len, __in_bcount(len) LPCWSTR source,
+ __in WCHAR deli,
+ __out size_t* count, __out_ecount(count) WCHAR*** out);
+
+DWORD GetConfigValue(
+ __in LPCWSTR keyName,
+ __out size_t* len,
+ __out_bcount(len) LPCWSTR* value);
+DWORD GetConfigValueFromXmlFile(
+ __in LPCWSTR xmlFile,
+ __in LPCWSTR keyName,
+ __out size_t* len,
+ __out_bcount(len) LPCWSTR* value);
+
+
+DWORD BuildServiceSecurityDescriptor(
+ __in ACCESS_MASK accessMask,
+ __in size_t grantSidCount,
+ __in_ecount(grantSidCount) PSID* pGrantSids,
+ __in size_t denySidCount,
+ __in_ecount(denySidCount) PSID* pDenySids,
+ __out PSECURITY_DESCRIPTOR* pSD);
+
+
+#define SVCNAME TEXT("hdpwinutilsvc")
+#define SVCBINDING TEXT("ncalrpc")
+
+int RpcCall_TaskCreateAsUser(
+ LPCWSTR cwd, LPCWSTR jobName,
+ LPCWSTR user, LPCWSTR pidFile, LPCWSTR cmdLine,
+ HANDLE* phProcess, HANDLE* phThread, HANDLE* phStdIn, HANDLE* phStdOut, HANDLE* phStdErr);
+
+#ifdef __cplusplus
+}
+#endif
+
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c
index 3de458c..873ff68 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c
+++ hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.c
@@ -19,9 +19,15 @@
#pragma comment(lib, "netapi32.lib")
#pragma comment(lib, "Secur32.lib")
#pragma comment(lib, "Userenv.lib")
+#pragma comment(lib, "Ntdsapi.lib")
+
#include "winutils.h"
+#include
+#include
#include
#include
+#include
+#include
/*
* The array of 12 months' three-letter abbreviations
@@ -1706,10 +1712,12 @@ void ReportErrorCode(LPCWSTR func, DWORD err)
(LPWSTR)&msg, 0, NULL);
if (len > 0)
{
+ LogDebugMessage(L"%s error (%d): %s\n", func, err, msg);
fwprintf(stderr, L"%s error (%d): %s\n", func, err, msg);
}
else
{
+ LogDebugMessage(L"%s error code: %d.\n", func, err);
fwprintf(stderr, L"%s error code: %d.\n", func, err);
}
if (msg != NULL) LocalFree(msg);
@@ -2026,6 +2034,8 @@ done:
return loadProfileStatus;
}
+
+
DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi)
{
DWORD touchProfileStatus = ERROR_ASSERTION_FAILURE; // Failure to set status should trigger error
@@ -2046,3 +2056,384 @@ DWORD UnloadProfileForLogon(__in HANDLE logonHandle, __in PROFILEINFO * pi)
done:
return touchProfileStatus;
}
+
+
+LPCWSTR GetSystemTimeString() {
+ __declspec(thread) static WCHAR buffer[1024];
+ DWORD dwError;
+ FILETIME ftime;
+ SYSTEMTIME systime;
+ LARGE_INTEGER counter, frequency;
+ int subSec;
+ double qpc;
+ HRESULT hr;
+ buffer[0] = L'\0';
+
+ // GetSystemTimePreciseAsFileTime is only available in Win8+ and our libs do not link against it
+
+ GetSystemTimeAsFileTime(&ftime);
+
+ if (!FileTimeToSystemTime(&ftime, &systime)) {
+ dwError = GetLastError();
+ LogDebugMessage(L"FileTimeToSystemTime error:%d\n", dwError);
+ goto done;
+ }
+
+ // Get the ms from QPC. GetSystemTimeAdjustment is ignored...
+
+ QueryPerformanceCounter(&counter);
+ QueryPerformanceFrequency(&frequency);
+
+ qpc = (double) counter.QuadPart / (double) frequency.QuadPart;
+ subSec = ((qpc - (long)qpc) * 1000000);
+
+ hr = StringCbPrintf(buffer, sizeof(buffer), L"%02d:%02d:%02d.%06d",
+ (int)systime.wHour, (int)systime.wMinute, (int)systime.wSecond, (int)subSec);
+
+ if (FAILED(hr)) {
+ LogDebugMessage(L"StringCbPrintf error:%d\n", hr);
+ }
+done:
+ return buffer;
+}
+
+
+//----------------------------------------------------------------------------
+// Function: LogDebugMessage
+//
+// Description:
+// Sends a message to the debugger console, if one is attached
+//
+// Notes:
+// Native debugger: windbg, ntsd, cdb, visual studio
+//
+VOID LogDebugMessage(LPCWSTR format, ...) {
+ LPWSTR buffer[8192];
+ va_list args;
+ HRESULT hr;
+
+ if (!IsDebuggerPresent()) return;
+
+ va_start(args, format);
+ hr = StringCbVPrintf(buffer, sizeof(buffer), format, args);
+ if (SUCCEEDED(hr)) {
+ OutputDebugString(buffer);
+ }
+ va_end(args);
+}
+
+//----------------------------------------------------------------------------
+// Function: SplitStringIgnoreSpaceW
+//
+// Description:
+// splits a null-terminated string based on a delimiter
+//
+// Returns:
+// ERROR_SUCCESS: on success
+// error code: otherwise
+//
+// Notes:
+// The tokes are also null-terminated
+// Caller should use LocalFree to clear outTokens
+//
+DWORD SplitStringIgnoreSpaceW(
+ __in size_t len,
+ __in_bcount(len) LPCWSTR source,
+ __in WCHAR deli,
+ __out size_t* count,
+ __out_ecount(count) WCHAR*** outTokens) {
+
+ size_t tokenCount = 0;
+ size_t crtSource;
+ size_t crtToken = 0;
+ WCHAR* lpwszTokenStart = NULL;
+ WCHAR* lpwszTokenEnd = NULL;
+ WCHAR* lpwszBuffer = NULL;
+ size_t tokenLength = 0;
+ size_t cchBufferLength = 0;
+ WCHAR crt;
+ WCHAR** tokens = NULL;
+ enum {BLANK, TOKEN, DELIMITER} State = BLANK;
+
+ for(crtSource = 0; crtSource < len; ++crtSource) {
+ crt = source[crtSource];
+ switch(State) {
+ case BLANK: // intentional fallthrough
+ case DELIMITER:
+ if (crt == deli) {
+ State = DELIMITER;
+ }
+ else if (!iswspace(crt)) {
+ ++tokenCount;
+ lpwszTokenEnd = lpwszTokenStart = source + crtSource;
+ State = TOKEN;
+ }
+ else {
+ State = BLANK;
+ }
+ break;
+ case TOKEN:
+ if (crt == deli) {
+ State = DELIMITER;
+ cchBufferLength += lpwszTokenEnd - lpwszTokenStart + 2;
+ }
+ else if (!iswspace(crt)) {
+ lpwszTokenEnd = source + crtSource;
+ }
+ break;
+ }
+ }
+
+ if (State == TOKEN) {
+ cchBufferLength += lpwszTokenEnd - lpwszTokenStart + 2;
+ }
+
+ LogDebugMessage(L"counted %d [buffer:%d] tokens in %s\n", tokenCount, cchBufferLength, source);
+
+ #define COPY_CURRENT_TOKEN \
+ tokenLength = lpwszTokenEnd - lpwszTokenStart + 1; \
+ tokens[crtToken] = lpwszBuffer; \
+ memcpy(tokens[crtToken], lpwszTokenStart, tokenLength*sizeof(WCHAR)); \
+ tokens[crtToken][tokenLength] = L'\0'; \
+ lpwszBuffer += (tokenLength+1); \
+ ++crtToken;
+
+ if (tokenCount) {
+
+ // We use one contigous memory for both the pointer arrays and the data copy buffers
+ // We cannot use in-place references (zero-copy) because the function users
+ // need null-terminated strings for the tokens
+
+ tokens = (WCHAR**) LocalAlloc(LPTR,
+ sizeof(WCHAR*) * tokenCount + // for the pointers
+ sizeof(WCHAR) * cchBufferLength); // for the data
+
+ // Data will be copied after the array
+ lpwszBuffer = (WCHAR*)(((BYTE*)tokens) + (sizeof(WCHAR*) * tokenCount));
+
+ State = BLANK;
+
+ for(crtSource = 0; crtSource < len; ++crtSource) {
+ crt = source[crtSource];
+ switch(State) {
+ case DELIMITER: // intentional fallthrough
+ case BLANK:
+ if (crt == deli) {
+ State = DELIMITER;
+ }
+ else if (!iswspace(crt)) {
+ lpwszTokenEnd = lpwszTokenStart = source + crtSource;
+ State = TOKEN;
+ }
+ else {
+ State = BLANK;
+ }
+ break;
+ case TOKEN:
+ if (crt == deli) {
+ COPY_CURRENT_TOKEN;
+ State = DELIMITER;
+ }
+ else if (!iswspace(crt)) {
+ lpwszTokenEnd = source + crtSource;
+ }
+ break;
+ }
+ }
+
+ // Copy out last token, if any
+ if (TOKEN == State) {
+ COPY_CURRENT_TOKEN;
+ }
+ }
+
+ *count = tokenCount;
+ *outTokens = tokens;
+
+ return ERROR_SUCCESS;
+}
+
+//----------------------------------------------------------------------------
+// Function: BuildServiceSecurityDescriptor
+//
+// Description:
+// Builds a security descriptor for an arbitrary object
+//
+// Returns:
+// ERROR_SUCCESS: on success
+// error code: otherwise
+//
+// Notes:
+// The SD is a of the self-contained flavor (offsets, not pointers)
+// Caller should use LocalFree to clear allocated pSD
+//
+DWORD BuildServiceSecurityDescriptor(
+ __in ACCESS_MASK accessMask,
+ __in size_t grantSidCount,
+ __in_ecount(grantSidCount) PSID* pGrantSids,
+ __in size_t denySidCount,
+ __in_ecount(denySidCount) PSID* pDenySids,
+ __out PSECURITY_DESCRIPTOR* pSD) {
+
+ DWORD dwError = ERROR_SUCCESS;
+ int crt = 0;
+ int len = 0;
+ EXPLICIT_ACCESS* eas = NULL;
+ LPWSTR lpszSD = NULL;
+ ULONG cchSD = 0;
+ HANDLE hToken = INVALID_HANDLE_VALUE;
+ DWORD dwBufferSize = 0;
+ PTOKEN_USER pTokenUser = NULL;
+ PTOKEN_PRIMARY_GROUP pTokenGroup = NULL;
+ PSECURITY_DESCRIPTOR pTempSD = NULL;
+ ULONG cbSD = 0;
+ TRUSTEE owner, group;
+
+ ZeroMemory(&owner, sizeof(owner));
+
+ // We'll need our own SID to add as SD owner
+ if (!OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY, &hToken)) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ if (!GetTokenInformation(hToken, TokenUser, NULL, 0, &dwBufferSize)) {
+ dwError = GetLastError();
+ if (ERROR_INSUFFICIENT_BUFFER != dwError) {
+ goto done;
+ }
+ }
+
+ pTokenUser = (PTOKEN_USER) LocalAlloc(LPTR, dwBufferSize);
+ if (NULL == pTokenUser) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ if (!GetTokenInformation(hToken, TokenUser, pTokenUser, dwBufferSize, &dwBufferSize)) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ if (!IsValidSid(pTokenUser->User.Sid)) {
+ dwError = ERROR_INVALID_PARAMETER;
+ goto done;
+ }
+
+ dwBufferSize = 0;
+ if (!GetTokenInformation(hToken, TokenPrimaryGroup, NULL, 0, &dwBufferSize)) {
+ dwError = GetLastError();
+ if (ERROR_INSUFFICIENT_BUFFER != dwError) {
+ goto done;
+ }
+ }
+
+ pTokenGroup = (PTOKEN_USER) LocalAlloc(LPTR, dwBufferSize);
+ if (NULL == pTokenUser) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ if (!GetTokenInformation(hToken, TokenPrimaryGroup, pTokenGroup, dwBufferSize, &dwBufferSize)) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ if (!IsValidSid(pTokenGroup->PrimaryGroup)) {
+ dwError = ERROR_INVALID_PARAMETER;
+ goto done;
+ }
+
+ owner.TrusteeForm = TRUSTEE_IS_SID;
+ owner.TrusteeType = TRUSTEE_IS_UNKNOWN;
+ owner.ptstrName = (LPCWSTR) pTokenUser->User.Sid;
+
+ group.TrusteeForm = TRUSTEE_IS_SID;
+ group.TrusteeType = TRUSTEE_IS_UNKNOWN;
+ group.ptstrName = (LPCWSTR) pTokenGroup->PrimaryGroup;
+
+ eas = (EXPLICIT_ACCESS*) alloca(sizeof(EXPLICIT_ACCESS) * (grantSidCount + denySidCount));
+
+ // Build the granted list
+ for (crt = 0; crt < grantSidCount; ++crt) {
+ eas[crt].grfAccessPermissions = accessMask;
+ eas[crt].grfAccessMode = GRANT_ACCESS;
+ eas[crt].grfInheritance = NO_INHERITANCE;
+ eas[crt].Trustee.TrusteeForm = TRUSTEE_IS_SID;
+ eas[crt].Trustee.TrusteeType = TRUSTEE_IS_UNKNOWN;
+ eas[crt].Trustee.ptstrName = (LPCWSTR) pGrantSids[crt];
+ }
+
+ // Build the deny list
+ for (; crt < grantSidCount + denySidCount; ++crt) {
+ eas[crt].grfAccessPermissions = accessMask;
+ eas[crt].grfAccessMode = DENY_ACCESS;
+ eas[crt].grfInheritance = NO_INHERITANCE;
+ eas[crt].Trustee.TrusteeForm = TRUSTEE_IS_SID;
+ eas[crt].Trustee.TrusteeType = TRUSTEE_IS_UNKNOWN;
+ eas[crt].Trustee.ptstrName = (LPCWSTR) pDenySids[crt - grantSidCount];
+ }
+
+ dwError = BuildSecurityDescriptor(
+ &owner,
+ &group,
+ crt,
+ eas,
+ 0, // cCountOfAuditEntries
+ NULL, // pListOfAuditEntries
+ NULL, // pOldSD
+ &cbSD,
+ &pTempSD);
+ if (ERROR_SUCCESS != dwError) {
+ goto done;
+ }
+
+ *pSD = pTempSD;
+ pTempSD = NULL;
+
+ if (IsDebuggerPresent()) {
+ ConvertSecurityDescriptorToStringSecurityDescriptor(*pSD,
+ SDDL_REVISION_1,
+ DACL_SECURITY_INFORMATION,
+ &lpszSD,
+ &cchSD);
+ LogDebugMessage(L"pSD: %.*s\n", cchSD, lpszSD);
+ }
+
+done:
+ if (pTokenUser) LocalFree(pTokenUser);
+ if (INVALID_HANDLE_VALUE != hToken) CloseHandle(hToken);
+ if (lpszSD) LocalFree(lpszSD);
+ if (pTempSD) LocalFree(pTempSD);
+ return dwError;
+}
+
+
+//----------------------------------------------------------------------------
+// Function: MIDL_user_allocate
+//
+// Description:
+// Hard-coded function name used by RPC midl code for allocations
+//
+// Notes:
+// Must match the de-allocation mechanism used in MIDL_user_free
+//
+void __RPC_FAR * __RPC_USER MIDL_user_allocate(size_t len)
+{
+ return LocalAlloc(LPTR, len);
+}
+
+ //----------------------------------------------------------------------------
+ // Function: MIDL_user_free
+ //
+ // Description:
+ // Hard-coded function name used by RPC midl code for deallocations
+ //
+ // NoteS:
+ // Must match the allocation mechanism used in MIDL_user_allocate
+ //
+void __RPC_USER MIDL_user_free(void __RPC_FAR * ptr)
+{
+ LocalFree(ptr);
+}
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj
index fc0519d..63d936b 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj
+++ hadoop-common-project/hadoop-common/src/main/winutils/libwinutils.vcxproj
@@ -160,11 +160,22 @@
+
+
+
+
+
+
+
+ true
+ X64
+
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/main.c hadoop-common-project/hadoop-common/src/main/winutils/main.c
index 0f40774..e43b6ac 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/main.c
+++ hadoop-common-project/hadoop-common/src/main/winutils/main.c
@@ -67,6 +67,10 @@ int wmain(__in int argc, __in_ecount(argc) wchar_t* argv[])
{
return SystemInfo();
}
+ else if (wcscmp(L"service", cmd) == 0)
+ {
+ return RunService(argc - 1, argv + 1);
+ }
else if (wcscmp(L"help", cmd) == 0)
{
Usage(argv[0]);
@@ -119,5 +123,9 @@ The available commands and their usages are:\n\n", program);
fwprintf(stdout, L"%-15s%s\n\n", L"task", L"Task operations.");
TaskUsage();
+
+ fwprintf(stdout, L"%-15s%s\n\n", L"service", L"Service operations.");
+ ServiceUsage();
+
fwprintf(stdout, L"\n\n");
}
diff --git hadoop-common-project/hadoop-common/src/main/winutils/service.c hadoop-common-project/hadoop-common/src/main/winutils/service.c
new file mode 100644
index 0000000..00f6857
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/winutils/service.c
@@ -0,0 +1,883 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with this
+* work for additional information regarding copyright ownership. The ASF
+* licenses this file to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+#include "winutils.h"
+#include "winutils_msg.h"
+#include
+#include
+#include
+#include
+#include
+#include
+#include "hdpwinutilsvc_h.h"
+
+#pragma comment(lib, "Rpcrt4.lib")
+#pragma comment(lib, "advapi32.lib")
+#pragma comment(lib, "authz.lib")
+
+
+#define NM_WSCE_ALLOWED L"yarn.nodemanager.windows-secure-container-executor.allowed"
+#define NM_WSCE_DENIED L"yarn.nodemanager.windows-secure-container-executor.denied"
+
+#define SERVICE_ACCESS_MASK 0x00000001
+
+SERVICE_STATUS gSvcStatus;
+SERVICE_STATUS_HANDLE gSvcStatusHandle;
+HANDLE ghSvcStopEvent = INVALID_HANDLE_VALUE;
+HANDLE ghWaitObject = INVALID_HANDLE_VALUE;
+HANDLE ghEventLog = INVALID_HANDLE_VALUE;
+BOOL isListenning = FALSE;
+PSECURITY_DESCRIPTOR pAllowedSD = NULL;
+
+VOID SvcError(DWORD dwError);
+VOID WINAPI SvcMain(DWORD dwArg, LPTSTR* lpszArgv);
+DWORD SvcInit();
+DWORD RpcInit();
+DWORD AuthInit();
+VOID ReportSvcStatus( DWORD dwCurrentState,
+ DWORD dwWin32ExitCode,
+ DWORD dwWaitHint);
+VOID WINAPI SvcCtrlHandler( DWORD dwCtrl );
+VOID CALLBACK SvcShutdown(
+ _In_ PVOID lpParameter,
+ _In_ BOOLEAN TimerOrWaitFired);
+
+#define CHECK_ERROR_DONE(status, expected, category, message) \
+ if (status != expected) { \
+ ReportSvcCheckError( \
+ EVENTLOG_ERROR_TYPE, \
+ category, \
+ status, \
+ message); \
+ goto done; \
+ } else { \
+ LogDebugMessage(L"%s: OK\n", message); \
+ }
+
+
+#define CHECK_RPC_STATUS_DONE(status, message) \
+ CHECK_ERROR_DONE(status, RPC_S_OK, SERVICE_CATEGORY, message)
+
+#define CHECK_SVC_STATUS_DONE(status, message) \
+ CHECK_ERROR_DONE(status, ERROR_SUCCESS, SERVICE_CATEGORY, message)
+
+#define CHECK_UNWIND_RPC(rpcCall) { \
+ unwindStatus = rpcCall; \
+ if (RPC_S_OK != unwindStatus) { \
+ ReportSvcCheckError( \
+ EVENTLOG_WARNING_TYPE, \
+ SERVICE_CATEGORY, \
+ unwindStatus, \
+ L#rpcCall); \
+ } \
+ }
+
+
+//----------------------------------------------------------------------------
+// Function: ReportSvcCheckError
+//
+// Description:
+// Reports an error with the system event log and to debugger console (if present)
+//
+void ReportSvcCheckError(WORD type, WORD category, DWORD dwError, LPCWSTR message) {
+ int len;
+ LPWSTR systemMsg = NULL;
+ LPWSTR appMsg = NULL;
+ DWORD dwReportError;
+ LPWSTR reportMsg = NULL;
+ WCHAR hexError[32];
+ LPCWSTR inserts[] = {message, NULL, NULL, NULL};
+ HRESULT hr;
+
+ hr = StringCbPrintf(hexError, sizeof(hexError), TEXT("%x"), dwError);
+ if (SUCCEEDED(hr)) {
+ inserts[1] = hexError;
+ }
+ else {
+ inserts[1] = L"(Failed to format dwError as string)";
+ }
+
+ len = FormatMessageW(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
+ NULL, dwError,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPWSTR)&systemMsg, 0, NULL);
+
+ if (len) {
+ inserts[2] = systemMsg;
+ }
+ else {
+ inserts[2] = L"(Failed to get the system error message)";
+ }
+
+ LogDebugMessage(L"%s:%d %.*s\n", message, dwError, len, systemMsg);
+
+ if (INVALID_HANDLE_VALUE != ghEventLog) {
+ if (!ReportEvent(ghEventLog, type, category, MSG_CHECK_ERROR,
+ NULL, // lpUserSid
+ (WORD) 3, // wNumStrings
+ (DWORD) 0, // dwDataSize
+ inserts, // *lpStrings
+ NULL // lpRawData
+ )) {
+ // We tried to report and failed. Send to dbg.
+ dwReportError = GetLastError();
+ len = FormatMessageW(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
+ NULL, dwReportError,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPWSTR)&reportMsg, 0, NULL);
+ LogDebugMessage(L"ReportEvent: Error:%d %.*s\n", dwReportError, reportMsg);
+ }
+ };
+
+ if (NULL != systemMsg) LocalFree(systemMsg);
+ if (NULL != reportMsg) LocalFree(reportMsg);
+}
+
+
+VOID ReportSvcMessage(WORD type, WORD category, DWORD msgId) {
+ DWORD dwError;
+
+ if (INVALID_HANDLE_VALUE != ghEventLog) {
+ if (!ReportEvent(ghEventLog, type, category, msgId,
+ NULL, // lpUserSid
+ (WORD) 0, // wNumStrings
+ (DWORD) 0, // dwDataSize
+ NULL, // *lpStrings
+ NULL // lpRawData
+ )) {
+ // We tried to report and failed but debugger is attached. Send to dbg.
+ dwError = GetLastError();
+ LogDebugMessage(L"ReportEvent: error %d\n", dwError);
+ }
+ }
+}
+
+
+//----------------------------------------------------------------------------
+// Function: RunService
+//
+// Description:
+// Registers with NT SCM and starts the service
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// Error code otherwise: otherwise
+DWORD RunService(__in int argc, __in_ecount(argc) wchar_t *argv[])
+{
+ DWORD dwError= ERROR_SUCCESS;
+ int argStart = 1;
+
+ static const SERVICE_TABLE_ENTRY serviceTable[] = {
+ { SVCNAME, (LPSERVICE_MAIN_FUNCTION) SvcMain },
+ { NULL, NULL }
+ };
+
+ dwError = AuthInit();
+ if (ERROR_SUCCESS != dwError) {
+ SvcError(dwError);
+ goto done;
+ }
+
+ ghEventLog = RegisterEventSource(NULL, SVCNAME);
+ if (NULL == ghEventLog) {
+ dwError = GetLastError();
+ CHECK_SVC_STATUS_DONE(dwError, L"RegisterEventSource")
+ }
+
+ if (!StartServiceCtrlDispatcher(serviceTable)) {
+ dwError = GetLastError();
+ CHECK_SVC_STATUS_DONE(dwError, L"StartServiceCtrlDispatcher")
+ }
+
+done:
+ return dwError;
+}
+
+//----------------------------------------------------------------------------
+// Function: SvcMain
+//
+// Description:
+// Service main entry point.
+//
+VOID WINAPI SvcMain() {
+ DWORD dwError = ERROR_SUCCESS;
+
+ gSvcStatusHandle = RegisterServiceCtrlHandler(
+ SVCNAME,
+ SvcCtrlHandler);
+ if( !gSvcStatusHandle ) {
+ dwError = GetLastError();
+ CHECK_SVC_STATUS_DONE(dwError, L"RegisterServiceCtrlHandler")
+ }
+
+ // These SERVICE_STATUS members remain as set here
+ gSvcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
+ gSvcStatus.dwServiceSpecificExitCode = 0;
+
+ // Report initial status to the SCM
+ ReportSvcStatus( SERVICE_START_PENDING, NO_ERROR, 3000 );
+
+ // Perform service-specific initialization and work.
+ dwError = SvcInit();
+
+done:
+ return;
+}
+
+//----------------------------------------------------------------------------
+// Function: SvcInit
+//
+// Description:
+// Initializes the service.
+//
+DWORD SvcInit() {
+ DWORD dwError = ERROR_SUCCESS;
+
+ dwError = EnablePrivilege(SE_DEBUG_NAME);
+ if( dwError != ERROR_SUCCESS ) {
+ goto done;
+ }
+
+ // The recommended way to shutdown the service is to use an event
+ // and attach a callback with RegisterWaitForSingleObject
+ //
+ ghSvcStopEvent = CreateEvent(
+ NULL, // default security attributes
+ TRUE, // manual reset event
+ FALSE, // not signaled
+ NULL); // no name
+
+ if ( ghSvcStopEvent == NULL)
+ {
+ dwError = GetLastError();
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ dwError, L"CreateEvent");
+ ReportSvcStatus( SERVICE_STOPPED, dwError, 0 );
+ goto done;
+ }
+
+ if (!RegisterWaitForSingleObject (&ghWaitObject,
+ ghSvcStopEvent,
+ SvcShutdown,
+ NULL,
+ INFINITE,
+ WT_EXECUTEONLYONCE)) {
+ dwError = GetLastError();
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ dwError, L"RegisterWaitForSingleObject");
+ CloseHandle(ghSvcStopEvent);
+ ReportSvcStatus( SERVICE_STOPPED, dwError, 0 );
+ goto done;
+ }
+
+ // Report running status when initialization is complete.
+ ReportSvcStatus( SERVICE_RUNNING, NO_ERROR, 0 );
+
+ dwError = RpcInit();
+
+done:
+ return dwError;
+}
+
+//----------------------------------------------------------------------------
+// Function: RpcAuthorizeCallback
+//
+// Description:
+// RPC Authorization callback.
+//
+// Returns:
+// RPC_S_OK for access authorized
+// RPC_S_ACCESS_DENIED for access denied
+//
+RPC_STATUS CALLBACK RpcAuthorizeCallback (
+ RPC_IF_HANDLE hInterface,
+ void* pContext)
+{
+ RPC_STATUS status,
+ unwindStatus,
+ authStatus = RPC_S_ACCESS_DENIED;
+ DWORD dwError;
+ LUID luidReserved2;
+ AUTHZ_ACCESS_REQUEST request;
+ AUTHZ_ACCESS_REPLY reply;
+ AUTHZ_CLIENT_CONTEXT_HANDLE hClientContext = NULL;
+ DWORD authError = ERROR_SUCCESS;
+ DWORD saclResult = 0;
+ ACCESS_MASK grantedMask = 0;
+
+ ZeroMemory(&luidReserved2, sizeof(luidReserved2));
+ ZeroMemory(&request, sizeof(request));
+ ZeroMemory(&reply, sizeof(reply));
+
+ status = RpcGetAuthorizationContextForClient(NULL,
+ FALSE, // ImpersonateOnReturn
+ NULL, // Reserved1
+ NULL, // pExpirationTime
+ luidReserved2, // Reserved2
+ 0, // Reserved3
+ NULL, // Reserved4
+ &hClientContext);
+ CHECK_RPC_STATUS_DONE(status, L"RpcGetAuthorizationContextForClient");
+
+ request.DesiredAccess = MAXIMUM_ALLOWED;
+ reply.Error = &authError;
+ reply.SaclEvaluationResults = &saclResult;
+ reply.ResultListLength = 1;
+ reply.GrantedAccessMask = &grantedMask;
+
+ if (!AuthzAccessCheck(
+ 0,
+ hClientContext,
+ &request,
+ NULL, // AuditEvent
+ pAllowedSD,
+ NULL, // OptionalSecurityDescriptorArray
+ 0, // OptionalSecurityDescriptorCount
+ &reply,
+ NULL // phAccessCheckResults
+ )) {
+ dwError = GetLastError();
+ CHECK_SVC_STATUS_DONE(dwError, L"AuthzAccessCheck");
+ }
+
+ LogDebugMessage(L"AutzAccessCheck: Error:%d sacl:%d access:%d\n",
+ authError, saclResult, grantedMask);
+ if (authError == ERROR_SUCCESS && (grantedMask & SERVICE_ACCESS_MASK)) {
+ authStatus = RPC_S_OK;
+ }
+
+done:
+ if (NULL != hClientContext) CHECK_UNWIND_RPC(RpcFreeAuthorizationContext(&hClientContext));
+ return authStatus;
+}
+
+//----------------------------------------------------------------------------
+// Function: AuthInit
+//
+// Description:
+// Initializes the authorization structures (security descriptor).
+//
+// Notes:
+// This is called from RunService solely for debugging purposed
+// so that it can be tested by wimply running winutil service from CLI (no SCM)
+//
+DWORD AuthInit() {
+ DWORD dwError = ERROR_SUCCESS;
+ int count = 0;
+ int crt = 0;
+ int len = 0;
+ LPCWSTR value = NULL;
+ WCHAR** tokens = NULL;
+ LPWSTR lpszSD = NULL;
+ ULONG cchSD = 0;
+ DWORD dwBufferSize = 0;
+ int allowedCount = 0;
+ PSID* allowedSids = NULL;
+
+
+ dwError = GetConfigValue(NM_WSCE_ALLOWED, &len, &value);
+ CHECK_SVC_STATUS_DONE(dwError, L"GetConfigValue");
+
+ if (0 == len) {
+ CHECK_SVC_STATUS_DONE(ERROR_BAD_CONFIGURATION, NM_WSCE_ALLOWED);
+ }
+
+ dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens);
+ CHECK_SVC_STATUS_DONE(dwError, L"SplitStringIgnoreSpaceW");
+
+ allowedSids = (PSID*) LocalAlloc(LPTR, sizeof(PSID) * count);
+ for (crt = 0; crt < count; ++crt) {
+ dwError = GetSidFromAcctNameW(tokens[crt], &allowedSids[crt]);
+ CHECK_SVC_STATUS_DONE(dwError, L"GetSidFromAcctNameW");
+ }
+
+ allowedCount = count;
+
+ dwError = BuildServiceSecurityDescriptor(SERVICE_ACCESS_MASK,
+ allowedCount, allowedSids, 0, NULL, &pAllowedSD);
+ CHECK_SVC_STATUS_DONE(dwError, L"BuildServiceSecurityDescriptor");
+
+done:
+ if (lpszSD) LocalFree(lpszSD);
+ if (value) LocalFree(value);
+ if (tokens) LocalFree(tokens);
+ return dwError;
+}
+
+//----------------------------------------------------------------------------
+// Function: RpcInit
+//
+// Description:
+// Initializes the RPC infrastructure and starts the RPC listenner.
+//
+DWORD RpcInit() {
+ RPC_STATUS status;
+ DWORD dwError;
+
+ status = RpcServerUseProtseqIf(SVCBINDING,
+ RPC_C_LISTEN_MAX_CALLS_DEFAULT,
+ Hdpwinutilsvc_v1_0_s_ifspec,
+ NULL);
+ if (RPC_S_OK != status) {
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ status, L"RpcServerUseProtseqIf");
+ SvcError(status);
+ dwError = status;
+ goto done;
+ }
+
+ status = RpcServerRegisterIfEx(Hdpwinutilsvc_v1_0_s_ifspec,
+ NULL, // MgrTypeUuid
+ NULL, // MgrEpv
+ RPC_IF_ALLOW_LOCAL_ONLY, // Flags
+ RPC_C_LISTEN_MAX_CALLS_DEFAULT, // Max calls
+ RpcAuthorizeCallback); // Auth callback
+
+ if (RPC_S_OK != status) {
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ status, L"RpcServerRegisterIfEx");
+ SvcError(status);
+ dwError = status;
+ goto done;
+ }
+
+ status = RpcServerListen(1, RPC_C_LISTEN_MAX_CALLS_DEFAULT, TRUE);
+ if (RPC_S_ALREADY_LISTENING == status) {
+ ReportSvcCheckError(EVENTLOG_WARNING_TYPE, SERVICE_CATEGORY,
+ status, L"RpcServerListen");
+ }
+ else if (RPC_S_OK != status) {
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ status, L"RpcServerListen");
+ SvcError(status);
+ dwError = status;
+ goto done;
+ }
+
+ isListenning = TRUE;
+
+ ReportSvcMessage(EVENTLOG_INFORMATION_TYPE, SERVICE_CATEGORY,
+ MSG_RPC_SERVICE_HAS_STARTED);
+
+done:
+ return dwError;
+}
+
+//----------------------------------------------------------------------------
+// Function: RpcStop
+//
+// Description:
+// Tears down the RPC infrastructure and stops the RPC listenner.
+//
+VOID RpcStop() {
+ RPC_STATUS status;
+
+ if (isListenning) {
+
+ status = RpcMgmtStopServerListening(NULL);
+ isListenning = FALSE;
+
+ if (RPC_S_OK != status) {
+ ReportSvcCheckError(EVENTLOG_WARNING_TYPE, SERVICE_CATEGORY,
+ status, L"RpcMgmtStopServerListening");
+ }
+
+ ReportSvcMessage(EVENTLOG_INFORMATION_TYPE, SERVICE_CATEGORY,
+ MSG_RPC_SERVICE_HAS_STOPPED);
+ }
+}
+
+//----------------------------------------------------------------------------
+// Function: CleanupHandles
+//
+// Description:
+// Cleans up the global service handles.
+//
+VOID CleanupHandles() {
+ if (INVALID_HANDLE_VALUE != ghWaitObject) {
+ UnregisterWait(ghWaitObject);
+ ghWaitObject = INVALID_HANDLE_VALUE;
+ }
+ if (INVALID_HANDLE_VALUE != ghSvcStopEvent) {
+ CloseHandle(ghSvcStopEvent);
+ ghSvcStopEvent = INVALID_HANDLE_VALUE;
+ }
+ if (INVALID_HANDLE_VALUE != ghEventLog) {
+ DeregisterEventSource(ghEventLog);
+ ghEventLog = INVALID_HANDLE_VALUE;
+ }
+}
+
+//----------------------------------------------------------------------------
+// Function: SvcError
+//
+// Description:
+// Aborts the startup sequence. Reports error, stops RPC, cleans up globals.
+//
+VOID SvcError(DWORD dwError) {
+ RpcStop();
+ CleanupHandles();
+ ReportSvcStatus( SERVICE_STOPPED, dwError, 0 );
+}
+
+//----------------------------------------------------------------------------
+// Function: SvcShutdown
+//
+// Description:
+// Callback when the shutdown event is signaled. Stops RPC, cleans up globals.
+//
+VOID CALLBACK SvcShutdown(
+ _In_ PVOID lpParameter,
+ _In_ BOOLEAN TimerOrWaitFired) {
+ RpcStop();
+ CleanupHandles();
+ ReportSvcStatus( SERVICE_STOPPED, NO_ERROR, 0 );
+}
+
+//----------------------------------------------------------------------------
+// Function: SvcCtrlHandler
+//
+// Description:
+// Callback from SCM for for service events (signals).
+//
+// Notes:
+// Shutdown is indirect, we set her the STOP_PENDING state and signal the stop event.
+// Signaling the event invokes SvcShutdown which completes the shutdown.
+// This two staged approach allows the SCM handler to complete fast,
+// not blocking the SCM big fat global lock.
+//
+VOID WINAPI SvcCtrlHandler( DWORD dwCtrl )
+{
+ // Handle the requested control code.
+
+ switch(dwCtrl)
+ {
+ case SERVICE_CONTROL_STOP:
+ ReportSvcStatus(SERVICE_STOP_PENDING, NO_ERROR, 0);
+
+ // Signal the service to stop.
+ SetEvent(ghSvcStopEvent);
+
+ return;
+
+ default:
+ break;
+ }
+
+}
+
+//----------------------------------------------------------------------------
+// Function: ReportSvcStatus
+//
+// Description:
+// Updates the service status with the SCM.
+//
+VOID ReportSvcStatus( DWORD dwCurrentState,
+ DWORD dwWin32ExitCode,
+ DWORD dwWaitHint)
+{
+ static DWORD dwCheckPoint = 1;
+ DWORD dwError;
+
+ // Fill in the SERVICE_STATUS structure.
+
+ gSvcStatus.dwCurrentState = dwCurrentState;
+ gSvcStatus.dwWin32ExitCode = dwWin32ExitCode;
+ gSvcStatus.dwWaitHint = dwWaitHint;
+
+ if (dwCurrentState == SERVICE_START_PENDING)
+ gSvcStatus.dwControlsAccepted = 0;
+ else gSvcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP;
+
+ if ( (dwCurrentState == SERVICE_RUNNING) ||
+ (dwCurrentState == SERVICE_STOPPED) )
+ gSvcStatus.dwCheckPoint = 0;
+ else gSvcStatus.dwCheckPoint = dwCheckPoint++;
+
+ // Report the status of the service to the SCM.
+ if (!SetServiceStatus( gSvcStatusHandle, &gSvcStatus)) {
+ dwError = GetLastError();
+ ReportSvcCheckError(EVENTLOG_WARNING_TYPE, SERVICE_CATEGORY,
+ dwError, L"SetServiceStatus");
+ };
+}
+
+//----------------------------------------------------------------------------
+// Function: WinutilsCreateProcessAsUser
+//
+// Description:
+// The RPC midl declared function implementation
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// Error code otherwise: otherwise
+//
+// Notes:
+// This is the entry point when the NodeManager does the RPC call
+// Note that the RPC call does not do any S4U work. Is simply spawns (suspended) wintutils
+// using the right command line and the handles over the spwaned process to the NM
+// The actual S4U work occurs in the spawned process, run and monitored by the NM
+//
+error_status_t WinutilsCreateProcessAsUser(
+ /* [in] */ int nmPid,
+ /* [in] */ CREATE_PROCESS_REQUEST *request,
+ /* [out] */ CREATE_PROCESS_RESPONSE **response) {
+
+ DWORD dwError = ERROR_SUCCESS;
+ LPCWSTR inserts[] = {request->cwd, request->jobName, request->user, request->pidFile, request->cmdLine, NULL};
+ WCHAR winutilsPath[MAX_PATH];
+ WCHAR fullCmdLine[32768];
+ HANDLE taskStdInRd = INVALID_HANDLE_VALUE, taskStdInWr = INVALID_HANDLE_VALUE,
+ taskStdOutRd = INVALID_HANDLE_VALUE, taskStdOutWr = INVALID_HANDLE_VALUE,
+ taskStdErrRd = INVALID_HANDLE_VALUE, taskStdErrWr = INVALID_HANDLE_VALUE,
+ hNmProcess = INVALID_HANDLE_VALUE,
+ hDuplicateProcess = INVALID_HANDLE_VALUE,
+ hDuplicateThread = INVALID_HANDLE_VALUE,
+ hDuplicateStdIn = INVALID_HANDLE_VALUE,
+ hDuplicateStdOut = INVALID_HANDLE_VALUE,
+ hDuplicateStdErr = INVALID_HANDLE_VALUE,
+ hSelfProcess = INVALID_HANDLE_VALUE;
+ BOOL fMustCleanupProcess = FALSE;
+
+ HRESULT hr;
+ STARTUPINFO si;
+ PROCESS_INFORMATION pi;
+ SECURITY_ATTRIBUTES saTaskStdInOutErr;
+
+ ZeroMemory( &si, sizeof(si) );
+ si.cb = sizeof(si);
+ ZeroMemory( &pi, sizeof(pi) );
+ pi.hProcess = INVALID_HANDLE_VALUE;
+ pi.hThread = INVALID_HANDLE_VALUE;
+ ZeroMemory( &saTaskStdInOutErr, sizeof(saTaskStdInOutErr));
+
+ // NB: GetCurrentProcess returns a pseudo-handle that just so happens
+ // has the value -1, ie. INVALID_HANDLE_VALUE. It cannot fail.
+ //
+ hSelfProcess = GetCurrentProcess();
+
+ hNmProcess = OpenProcess(PROCESS_DUP_HANDLE, FALSE, nmPid);
+ if (NULL == hNmProcess) {
+ dwError = GetLastError();
+ goto done;
+ }
+
+ GetModuleFileName(NULL, winutilsPath, sizeof(winutilsPath)/sizeof(WCHAR));
+ dwError = GetLastError(); // Always check after GetModuleFileName for ERROR_INSSUFICIENT_BUFFER
+ if (dwError) {
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ dwError, L"GetModuleFileName");
+ goto done;
+ }
+
+ // NB. We can call CreateProcess("wintuls","task create ...") or we can call
+ // CreateProcess(NULL, "winutils task create"). Only the second form passes "task" as
+ // argv[1], as expected by main. First form passes "task" as argv[0] and main fails.
+
+ hr = StringCbPrintf(fullCmdLine, sizeof(fullCmdLine), L"\"%s\" task createAsUser %ls %ls %ls %ls",
+ winutilsPath,
+ request->jobName, request->user, request->pidFile, request->cmdLine);
+ if (FAILED(hr)) {
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ hr, L"StringCbPrintf:fullCmdLine");
+ goto done;
+ }
+
+ LogDebugMessage(L"[%ls]: %ls %ls\n", request->cwd, winutilsPath, fullCmdLine);
+
+ // stdin/stdout/stderr redirection is handled here
+ // We create 3 anonimous named pipes.
+ // Security attributes are required so that the handles can be inherited.
+ // We assign one end of the pipe to the process (stdin gets a read end, stdout gets a write end)
+ // We then duplicate the other end in the NM process, and we close our own handle
+ // Finally we return the duplicate handle values to the NM
+ // The NM will attach Java file dscriptors to the duplicated handles and
+ // read/write them as ordinary Java InputStream/OutputStream objects
+
+ si.dwFlags |= STARTF_USESTDHANDLES;
+
+ saTaskStdInOutErr.nLength = sizeof(SECURITY_ATTRIBUTES);
+ saTaskStdInOutErr.bInheritHandle = TRUE;
+ saTaskStdInOutErr.lpSecurityDescriptor = NULL;
+
+ if (!CreatePipe(&taskStdInRd, &taskStdInWr, &saTaskStdInOutErr, 0)) {
+ dwError = GetLastError();
+ goto done;
+ }
+ if (!SetHandleInformation(taskStdInWr, HANDLE_FLAG_INHERIT, FALSE)) {
+ dwError = GetLastError();
+ goto done;
+ }
+ si.hStdInput = taskStdInRd;
+
+ if (!CreatePipe(&taskStdOutRd, &taskStdOutWr, &saTaskStdInOutErr, 0)) {
+ dwError = GetLastError();
+ goto done;
+ }
+ if (!SetHandleInformation(taskStdOutRd, HANDLE_FLAG_INHERIT, FALSE)) {
+ dwError = GetLastError();
+ goto done;
+ }
+ si.hStdOutput = taskStdOutWr;
+
+ if (!CreatePipe(&taskStdErrRd, &taskStdErrWr, &saTaskStdInOutErr, 0)) {
+ dwError = GetLastError();
+ goto done;
+ }
+ if (!SetHandleInformation(taskStdErrRd, HANDLE_FLAG_INHERIT, FALSE)) {
+ dwError = GetLastError();
+ goto done;
+ }
+ si.hStdError = taskStdErrWr;
+
+ if (!CreateProcess(
+ NULL, // lpApplicationName,
+ fullCmdLine, // lpCommandLine,
+ NULL, // lpProcessAttributes,
+ NULL, // lpThreadAttributes,
+ TRUE, // bInheritHandles,
+ CREATE_SUSPENDED, // dwCreationFlags,
+ NULL, // lpEnvironment,
+ request->cwd, // lpCurrentDirectory,
+ &si, // lpStartupInfo
+ &pi)) { // lpProcessInformation
+
+ dwError = GetLastError();
+ ReportSvcCheckError(EVENTLOG_ERROR_TYPE, SERVICE_CATEGORY,
+ dwError, L"CreateProcess");
+ goto done;
+ }
+
+ fMustCleanupProcess = TRUE;
+
+ LogDebugMessage(L"CreateProcess: pid:%x\n", pi.dwProcessId);
+
+ if (!DuplicateHandle(hSelfProcess, pi.hProcess, hNmProcess,
+ &hDuplicateProcess, 0, FALSE, DUPLICATE_SAME_ACCESS)) {
+ dwError = GetLastError();
+ LogDebugMessage(L"failed: pi.hProcess\n");
+ goto done;
+ }
+
+ if (!DuplicateHandle(hSelfProcess, pi.hThread, hNmProcess,
+ &hDuplicateThread, 0, FALSE, DUPLICATE_SAME_ACCESS)) {
+ dwError = GetLastError();
+ LogDebugMessage(L"failed: pi.hThread\n");
+ goto done;
+ }
+
+ if (!DuplicateHandle(hSelfProcess, taskStdInWr, hNmProcess,
+ &hDuplicateStdIn, 0, FALSE, DUPLICATE_SAME_ACCESS)) {
+ dwError = GetLastError();
+ LogDebugMessage(L"failed: taskStdInWr\n");
+ goto done;
+ }
+
+ if (!DuplicateHandle(hSelfProcess, taskStdOutRd, hNmProcess,
+ &hDuplicateStdOut, 0, FALSE, DUPLICATE_SAME_ACCESS)) {
+ dwError = GetLastError();
+ LogDebugMessage(L"failed: taskStdOutRd\n");
+ goto done;
+ }
+
+ if (!DuplicateHandle(hSelfProcess, taskStdErrRd, hNmProcess,
+ &hDuplicateStdErr, 0, FALSE, DUPLICATE_SAME_ACCESS)) {
+ dwError = GetLastError();
+ LogDebugMessage(L"failed: taskStdErrRd\n");
+ goto done;
+ }
+
+ *response = (CREATE_PROCESS_RESPONSE*) MIDL_user_allocate(sizeof(CREATE_PROCESS_RESPONSE));
+ if (NULL == *response) {
+ dwError = ERROR_OUTOFMEMORY;
+ LogDebugMessage(L"Failed to allocate CREATE_PROCESS_RESPONSE* response\n");
+ goto done;
+ }
+
+ // We're now transfering ownership of the duplicated handles to the caller
+ // If the RPC call fails *after* this point the handles are leaked inside the NM process
+
+ (*response)->hProcess = hDuplicateProcess;
+ (*response)->hThread = hDuplicateThread;
+ (*response)->hStdIn = hDuplicateStdIn;
+ (*response)->hStdOut = hDuplicateStdOut;
+ (*response)->hStdErr = hDuplicateStdErr;
+
+ fMustCleanupProcess = FALSE;
+
+done:
+
+ if (fMustCleanupProcess) {
+ LogDebugMessage(L"Cleaning process: %d due to error:%d\n", pi.dwProcessId, dwError);
+ TerminateProcess(pi.hProcess, EXIT_FAILURE);
+
+ // cleanup the duplicate handles inside the NM.
+
+ if (INVALID_HANDLE_VALUE != hDuplicateProcess) {
+ DuplicateHandle(hNmProcess, hDuplicateProcess, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE);
+ }
+ if (INVALID_HANDLE_VALUE != hDuplicateThread) {
+ DuplicateHandle(hNmProcess, hDuplicateThread, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE);
+ }
+ if (INVALID_HANDLE_VALUE != hDuplicateStdIn) {
+ DuplicateHandle(hNmProcess, hDuplicateStdIn, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE);
+ }
+ if (INVALID_HANDLE_VALUE != hDuplicateStdOut) {
+ DuplicateHandle(hNmProcess, hDuplicateStdOut, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE);
+ }
+ if (INVALID_HANDLE_VALUE != hDuplicateStdErr) {
+ DuplicateHandle(hNmProcess, hDuplicateStdErr, NULL, NULL, 0, FALSE, DUPLICATE_CLOSE_SOURCE);
+ }
+ }
+
+ if (INVALID_HANDLE_VALUE != hSelfProcess) CloseHandle(hSelfProcess);
+ if (INVALID_HANDLE_VALUE != hNmProcess) CloseHandle(hNmProcess);
+ if (INVALID_HANDLE_VALUE != taskStdInRd) CloseHandle(taskStdInRd);
+ if (INVALID_HANDLE_VALUE != taskStdInWr) CloseHandle(taskStdInWr);
+ if (INVALID_HANDLE_VALUE != taskStdOutRd) CloseHandle(taskStdOutRd);
+ if (INVALID_HANDLE_VALUE != taskStdOutWr) CloseHandle(taskStdOutWr);
+ if (INVALID_HANDLE_VALUE != taskStdErrRd) CloseHandle(taskStdErrRd);
+ if (INVALID_HANDLE_VALUE != taskStdErrWr) CloseHandle(taskStdErrWr);
+
+
+ // This is closing our own process/thread handles.
+ // If the transfer was succesfull the NM has its own duplicates (if any)
+ if (INVALID_HANDLE_VALUE != pi.hThread) CloseHandle(pi.hThread);
+ if (INVALID_HANDLE_VALUE != pi.hProcess) CloseHandle(pi.hProcess);
+
+ return dwError;
+}
+
+//----------------------------------------------------------------------------
+// Function: ServiceUsage
+//
+// Description:
+// Prints the CLI arguments for service command.
+//
+void ServiceUsage()
+{
+ fwprintf(stdout, L"\
+ Usage: service\n\
+ Starts the impersonation helper service.\n\
+ This should be called from the SCM.\n\
+ The impersonation helper service must run as a high privileged account (LocalSystem)\n\
+ and is used by the NodeManager to spawn secure containers.\n");
+}
+
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/task.c hadoop-common-project/hadoop-common/src/main/winutils/task.c
index 520184b..1e2374d 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/task.c
+++ hadoop-common-project/hadoop-common/src/main/winutils/task.c
@@ -19,10 +19,18 @@
#include
#include
#include
+#include
#define PSAPI_VERSION 1
#pragma comment(lib, "psapi.lib")
+
+#define NM_WSCE_IMPERSONATE_ALLOWED L"yarn.nodemanager.windows-secure-container-executor.impersonate.allowed"
+#define NM_WSCE_IMPERSONATE_DENIED L"yarn.nodemanager.windows-secure-container-executor.impersonate.denied"
+
+// The S4U impersonation access check mask. Arbitrary value (we use 1 for the service access check)
+#define SERVICE_IMPERSONATE_MASK 0x00000002
+
#define ERROR_TASK_NOT_ALIVE 1
// This exit code for killed processes is compatible with Unix, where a killed
@@ -104,6 +112,221 @@ static BOOL ParseCommandLine(__in int argc,
return FALSE;
}
+
+//----------------------------------------------------------------------------
+// Function: BuildImpersonateSecurityDescriptor
+//
+// Description:
+// Builds the security descriptor for the S4U impersonation permissions
+// This describes what users can be impersonated and what not
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// GetLastError: otherwise
+//
+DWORD BuildImpersonateSecurityDescriptor(__out PSECURITY_DESCRIPTOR* ppSD) {
+ DWORD dwError = ERROR_SUCCESS;
+ size_t countAllowed = 0;
+ PSID* allowedSids = NULL;
+ size_t countDenied = 0;
+ PSID* deniedSids = NULL;
+ LPCWSTR value = NULL;
+ WCHAR** tokens = NULL;
+ size_t len = 0;
+ size_t count = 0;
+ int crt = 0;
+ PSECURITY_DESCRIPTOR pSD = NULL;
+
+ dwError = GetConfigValue(NM_WSCE_IMPERSONATE_ALLOWED, &len, &value);
+ if (dwError) {
+ ReportErrorCode(L"GetConfigValue:1", dwError);
+ goto done;
+ }
+
+ if (0 == len) {
+ dwError = ERROR_BAD_CONFIGURATION;
+ ReportErrorCode(L"GetConfigValue:2", dwError);
+ goto done;
+ }
+
+ dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens);
+ if (dwError) {
+ ReportErrorCode(L"SplitStringIgnoreSpaceW:1", dwError);
+ goto done;
+ }
+
+ allowedSids = LocalAlloc(LPTR, sizeof(PSID) * count);
+ if (NULL == allowedSids) {
+ dwError = GetLastError();
+ ReportErrorCode(L"LocalAlloc:1", dwError);
+ goto done;
+ }
+
+ for(crt = 0; crt < count; ++crt) {
+ dwError = GetSidFromAcctNameW(tokens[crt], &allowedSids[crt]);
+ if (dwError) {
+ ReportErrorCode(L"GetSidFromAcctNameW:1", dwError);
+ goto done;
+ }
+ }
+ countAllowed = count;
+
+ LocalFree(tokens);
+ tokens = NULL;
+
+ LocalFree(value);
+ value = NULL;
+
+ dwError = GetConfigValue(NM_WSCE_IMPERSONATE_DENIED, &len, &value);
+ if (dwError) {
+ ReportErrorCode(L"GetConfigValue:3", dwError);
+ goto done;
+ }
+
+ if (0 != len) {
+ dwError = SplitStringIgnoreSpaceW(len, value, L',', &count, &tokens);
+ if (dwError) {
+ ReportErrorCode(L"SplitStringIgnoreSpaceW:2", dwError);
+ goto done;
+ }
+
+ deniedSids = LocalAlloc(LPTR, sizeof(PSID) * count);
+ if (NULL == allowedSids) {
+ dwError = GetLastError();
+ ReportErrorCode(L"LocalAlloc:2", dwError);
+ goto done;
+ }
+
+ for(crt = 0; crt < count; ++crt) {
+ dwError = GetSidFromAcctNameW(tokens[crt], &deniedSids[crt]);
+ if (dwError) {
+ ReportErrorCode(L"GetSidFromAcctNameW:2", dwError);
+ goto done;
+ }
+ }
+ countDenied = count;
+ }
+
+ dwError = BuildServiceSecurityDescriptor(
+ SERVICE_IMPERSONATE_MASK,
+ countAllowed, allowedSids,
+ countDenied, deniedSids,
+ &pSD);
+
+ if (dwError) {
+ ReportErrorCode(L"BuildServiceSecurityDescriptor", dwError);
+ goto done;
+ }
+
+ *ppSD = pSD;
+ pSD = NULL;
+
+done:
+ if (pSD) LocalFree(pSD);
+ if (tokens) LocalFree(tokens);
+ if (allowedSids) LocalFree(allowedSids);
+ if (deniedSids) LocalFree(deniedSids);
+ return dwError;
+}
+
+//----------------------------------------------------------------------------
+// Function: ValidateImpersonateAccessCheck
+//
+// Description:
+// Performs the access check for S4U impersonation
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// ERROR_ACCESS_DENIED, GetLastError: otherwise
+//
+DWORD ValidateImpersonateAccessCheck(__in HANDLE logonHandle) {
+ DWORD dwError = ERROR_SUCCESS;
+ PSECURITY_DESCRIPTOR pSD = NULL;
+ LUID luidUnused;
+ AUTHZ_ACCESS_REQUEST request;
+ AUTHZ_ACCESS_REPLY reply;
+ DWORD authError = ERROR_SUCCESS;
+ DWORD saclResult = 0;
+ ACCESS_MASK grantedMask = 0;
+ AUTHZ_RESOURCE_MANAGER_HANDLE hManager = NULL;
+ AUTHZ_CLIENT_CONTEXT_HANDLE hAuthzToken = NULL;
+
+ ZeroMemory(&luidUnused, sizeof(luidUnused));
+ ZeroMemory(&request, sizeof(request));
+ ZeroMemory(&reply, sizeof(reply));
+
+ dwError = BuildImpersonateSecurityDescriptor(&pSD);
+ if (dwError) {
+ ReportErrorCode(L"BuildImpersonateSecurityDescriptor", dwError);
+ goto done;
+ }
+
+ request.DesiredAccess = MAXIMUM_ALLOWED;
+ reply.Error = &authError;
+ reply.SaclEvaluationResults = &saclResult;
+ reply.ResultListLength = 1;
+ reply.GrantedAccessMask = &grantedMask;
+
+ if (!AuthzInitializeResourceManager(
+ AUTHZ_RM_FLAG_NO_AUDIT,
+ NULL, // pfnAccessCheck
+ NULL, // pfnComputeDynamicGroups
+ NULL, // pfnFreeDynamicGroups
+ NULL, // szResourceManagerName
+ &hManager)) {
+ dwError = GetLastError();
+ ReportErrorCode(L"AuthzInitializeResourceManager", dwError);
+ goto done;
+ }
+
+ if (!AuthzInitializeContextFromToken(
+ 0,
+ logonHandle,
+ hManager,
+ NULL, // expiration time
+ luidUnused, // not used
+ NULL, // callback args
+ &hAuthzToken)) {
+ dwError = GetLastError();
+ ReportErrorCode(L"AuthzInitializeContextFromToken", dwError);
+ goto done;
+ }
+
+ if (!AuthzAccessCheck(
+ 0,
+ hAuthzToken,
+ &request,
+ NULL, // AuditEvent
+ pSD,
+ NULL, // OptionalSecurityDescriptorArray
+ 0, // OptionalSecurityDescriptorCount
+ &reply,
+ NULL // phAccessCheckResults
+ )) {
+ dwError = GetLastError();
+ ReportErrorCode(L"AuthzAccessCheck", dwError);
+ goto done;
+ }
+
+ LogDebugMessage(L"AutzAccessCheck: Error:%d sacl:%d access:%d\n",
+ authError, saclResult, grantedMask);
+
+ if (authError != ERROR_SUCCESS) {
+ ReportErrorCode(L"AuthzAccessCheck:REPLY:1", authError);
+ dwError = authError;
+ }
+ else if (!(grantedMask & SERVICE_IMPERSONATE_MASK)) {
+ ReportErrorCode(L"AuthzAccessCheck:REPLY:2", ERROR_ACCESS_DENIED);
+ dwError = ERROR_ACCESS_DENIED;
+ }
+
+done:
+ if (hAuthzToken) AuthzFreeContext(hAuthzToken);
+ if (hManager) AuthzFreeResourceManager(hManager);
+ if (pSD) LocalFree(pSD);
+ return dwError;
+}
+
//----------------------------------------------------------------------------
// Function: CreateTaskImpl
//
@@ -131,6 +354,13 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PW
wchar_t* curr_dir = NULL;
FILE *stream = NULL;
+ if (NULL != logonHandle) {
+ dwErrorCode = ValidateImpersonateAccessCheck(logonHandle);
+ if (dwErrorCode) {
+ return dwErrorCode;
+ }
+ }
+
// Create un-inheritable job object handle and set job object to terminate
// when last handle is closed. So winutils.exe invocation has the only open
// job object handle. Exit of winutils.exe ensures termination of job object.
@@ -305,7 +535,8 @@ DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine)
// Returns:
// ERROR_SUCCESS: On success
// GetLastError: otherwise
-DWORD CreateTaskAsUser(__in PCWSTR jobObjName,__in PWSTR user, __in PWSTR pidFilePath, __in PWSTR cmdLine)
+DWORD CreateTaskAsUser(__in PCWSTR jobObjName,
+ __in PCWSTR user, __in PCWSTR pidFilePath, __in PCWSTR cmdLine)
{
DWORD err = ERROR_SUCCESS;
DWORD exitCode = EXIT_FAILURE;
diff --git hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc
new file mode 100644
index 0000000..a2e30ad
--- /dev/null
+++ hadoop-common-project/hadoop-common/src/main/winutils/winutils.mc
@@ -0,0 +1,46 @@
+; // winutils.mc
+
+; // EventLog messages for Hadoop winutils service.
+
+
+LanguageNames=(English=0x409:MSG00409)
+
+
+; // The following are the categories of events.
+
+MessageIdTypedef=WORD
+
+MessageId=0x1
+SymbolicName=SERVICE_CATEGORY
+Language=English
+Service Events
+.
+
+MessageId=0x2
+SymbolicName=LOG_CATEGORY
+Language=English
+Task Events
+.
+
+; // The following are the message definitions.
+
+MessageIdTypedef=DWORD
+
+MessageId=0x80
+SymbolicName=MSG_CHECK_ERROR
+Language=English
+%1. Error %2: %3.
+.
+
+MessageId=0x100
+SymbolicName=MSG_RPC_SERVICE_HAS_STARTED
+Language=English
+The LPC server is listenning.
+.
+
+MessageId=0x200
+SymbolicName=MSG_RPC_SERVICE_HAS_STOPPED
+Language=English
+The LPC server has stopped listenning.
+.
+
diff --git hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
index 5b9a195..f216b71 100644
--- hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
+++ hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
@@ -1,5 +1,4 @@
-
-
@@ -86,11 +84,6 @@
true
-
- true
-
- ..\..\..\target\winutils\$(Configuration)\
-
false
@@ -124,6 +117,10 @@
Console
true
+
+ X64
+ ..\..\..\target\winutils\$(Configuration)\
+
@@ -159,7 +156,40 @@
true
+
+
+ $(IntermediateOutputPath)
+
+
+ Compiling Messages
+ mc.exe $(TargetName).mc -z $(TargetName)_msg -r $(IntermediateOutputPath) -h $(IntermediateOutputPath) -U
+ $(IntermediateOutputPath)\$(TargetName)_msg.rc,$(IntermediateOutputPath)\$(TargetName)_msg.h
+
+
+ true
+ X64
+ $(IntermediateOutputPath)
+ true
+ true
+ true
+ 2
+
+
+
+ Midl
+ ClCompile,ResourceCompile
+
+
+
+
+
+
+
+
+
+
+
@@ -179,4 +209,4 @@
-
\ No newline at end of file
+
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
index 2f8b84d..1e2d16e 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
@@ -296,7 +296,7 @@ public static boolean isAlive(String pid) {
return false;
} catch (IOException ioe) {
LOG.warn("Error executing shell command "
- + Arrays.toString(shexec.getExecString()) + ioe);
+ + shexec.toString() + ioe);
return false;
}
return (shexec.getExitCode() == 0 ? true : false);
@@ -321,7 +321,7 @@ public static boolean isProcessGroupAlive(String pgrpId) {
return false;
} catch (IOException ioe) {
LOG.warn("Error executing shell command "
- + Arrays.toString(shexec.getExecString()) + ioe);
+ + shexec.toString() + ioe);
return false;
}
return (shexec.getExitCode() == 0 ? true : false);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 35b61b8..44adf55 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -868,7 +868,7 @@
*/
public static final String NM_WINDOWS_SECURE_CONTAINER_GROUP =
NM_PREFIX + "windows-secure-container-executor.group";
-
+
/** T-file compression types used to compress aggregated logs.*/
public static final String NM_LOG_AGG_COMPRESSION_TYPE =
NM_PREFIX + "log-aggregation.compression-type";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 5ec9c4c..991bb8d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -31,9 +31,11 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
@@ -41,6 +43,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ICommandExecutor;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -183,20 +186,16 @@ public int launchContainer(Container container,
// create log dir under app
// fork script
- ShellCommandExecutor shExec = null;
+ Shell.ICommandExecutor shExec = null;
try {
setScriptExecutable(launchDst, userName);
setScriptExecutable(sb.getWrapperScriptPath(), userName);
- // Setup command to run
- String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
- containerIdStr, userName, pidFile, this.getConf());
-
- LOG.info("launchContainer: " + Arrays.toString(command));
- shExec = new ShellCommandExecutor(
- command,
+ shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
+ containerIdStr, userName, pidFile, this.getConf(),
new File(containerWorkDir.toUri().getPath()),
- container.getLaunchContext().getEnvironment()); // sanitized env
+ container.getLaunchContext().getEnvironment());
+
if (isContainerActive(containerId)) {
shExec.execute();
}
@@ -241,11 +240,25 @@ public int launchContainer(Container container,
}
return exitCode;
} finally {
- ; //
+ shExec.dispose(); //
}
return 0;
}
+ protected ICommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr,
+ String userName, Path pidFile, Configuration conf, File wordDir, Map environment)
+ throws IOException {
+
+ String[] command = getRunCommand(wrapperScriptPath,
+ containerIdStr, userName, pidFile, this.getConf());
+
+ LOG.info("launchContainer: " + Arrays.toString(command));
+ return new ShellCommandExecutor(
+ command,
+ wordDir,
+ environment);
+ }
+
protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder(
String containerIdStr, Path containerWorkDir) {
return Shell.WINDOWS ?
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
index 30beaf8..572d703 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
@@ -17,25 +17,34 @@
*/
package org.apache.hadoop.yarn.server.nodemanager;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.PrintStream;
+import java.io.Reader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.WinutilsProcessStub;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ICommandExecutor;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.LocalWrapperScriptBuilder;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
/**
@@ -43,7 +52,7 @@
*
*/
public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
-
+
private static final Log LOG = LogFactory
.getLog(WindowsSecureContainerExecutor.class);
@@ -59,6 +68,126 @@ protected void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream
pout.format("@call \"%s\"", launchDst);
}
}
+
+ private static class WintuilsProcessStubExecutor implements Shell.ICommandExecutor {
+ private WinutilsProcessStub processStub;
+ private StringBuilder output = new StringBuilder();
+ private int exitCode;
+
+ private enum State {
+ INIT,
+ RUNNING,
+ COMPLETE
+ };
+
+ private State state;;
+
+ private final String cwd;
+ private final String jobName;
+ private final String userName;
+ private final String pidFile;
+ private final String cmdLine;
+ private final Configuration conf;
+
+ public WintuilsProcessStubExecutor(
+ Configuration conf,
+ String cwd,
+ String jobName,
+ String userName,
+ String pidFile,
+ String cmdLine) {
+ this.conf = conf;
+ this.cwd = cwd;
+ this.jobName = jobName;
+ this.userName = userName;
+ this.pidFile = pidFile;
+ this.cmdLine = cmdLine;
+ this.state = State.INIT;
+ }
+
+ private void assumeComplete() throws IOException {
+ if (state != State.COMPLETE) {
+ throw new IOException("Process is not complete");
+ }
+ }
+
+ public String getOutput () throws IOException {
+ assumeComplete();
+ return output.toString();
+ }
+
+ public int getExitCode() throws IOException {
+ assumeComplete();
+ return exitCode;
+ }
+
+ public void validateResult() throws IOException {
+ assumeComplete();
+ if (0 != exitCode) {
+ LOG.warn(output.toString());
+ throw new IOException("Processs exit code is:" + exitCode);
+ }
+ }
+
+ private Thread startStreamReader(final InputStream stream) throws IOException {
+ Thread streamReaderThread = new Thread() {
+
+ @Override
+ public void run() {
+ try
+ {
+ BufferedReader rdr = new BufferedReader(
+ new InputStreamReader(stream));
+
+ String line = rdr.readLine();
+ while((line != null) && !isInterrupted()) {
+ synchronized(output) {
+ output.append(line);
+ output.append(System.getProperty("line.separator"));
+ }
+ line = rdr.readLine();
+ }
+ }
+ catch(Throwable t) {
+ LOG.error("Error occured reading the process stdout", t);
+ }
+ }
+ };
+ streamReaderThread.start();
+ return streamReaderThread;
+ }
+
+ public void execute() throws IOException {
+ if (state != State.INIT) {
+ throw new IOException("Process is already started");
+ }
+ processStub = NativeIO.createTaskAsUser(cwd, jobName, userName, pidFile, cmdLine);
+ state = State.RUNNING;
+
+ Thread stdOutReader = startStreamReader(processStub.getInputStream());
+ Thread stdErrReader = startStreamReader(processStub.getErrorStream());
+
+ try {
+ processStub.resume();
+ processStub.waitFor();
+ stdOutReader.join();
+ stdErrReader.join();
+ }
+ catch(InterruptedException ie) {
+ throw new IOException(ie);
+ }
+
+ exitCode = processStub.exitValue();
+ state = State.COMPLETE;
+ }
+
+ @Override
+ public void dispose() {
+ if (processStub != null) {
+ processStub.dispose();
+ }
+ }
+ }
private String nodeManagerGroup;
@@ -125,21 +254,12 @@ public void startLocalizer(Path nmPrivateContainerTokens,
copyFile(nmPrivateContainerTokens, tokenDst, user);
List command ;
- String[] commandArray;
- ShellCommandExecutor shExec;
File cwdApp = new File(appStorageDir.toString());
LOG.info(String.format("cwdApp: %s", cwdApp));
command = new ArrayList();
- command.add(Shell.WINUTILS);
- command.add("task");
- command.add("createAsUser");
- command.add("START_LOCALIZER_" + locId);
- command.add(user);
- command.add("nul:"); // PID file
-
//use same jvm as parent
File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java.exe");
command.add(jvm.toString());
@@ -160,12 +280,31 @@ public void startLocalizer(Path nmPrivateContainerTokens,
}
ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr, localDirs);
- commandArray = command.toArray(new String[command.size()]);
-
- shExec = new ShellCommandExecutor(
- commandArray, cwdApp);
-
- shExec.execute();
+
+ String cmdLine = StringUtils.join(command, " ");
+
+ WintuilsProcessStubExecutor stubExecutor = new WintuilsProcessStubExecutor(
+ getConf(),
+ cwdApp.getAbsolutePath(),
+ "START_LOCALIZER_" + locId, user, "nul:", cmdLine);
+ try {
+ stubExecutor.execute();
+ stubExecutor.validateResult();
+ }
+ finally {
+ stubExecutor.dispose();
+ }
}
+
+ @Override
+ protected ICommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr,
+ String userName, Path pidFile, Configuration conf, File wordDir, Map environment)
+ throws IOException {
+
+ return new WintuilsProcessStubExecutor(
+ getConf(),
+ wordDir.toString(),
+ containerIdStr, userName, pidFile.toString(), "cmd /c " + wrapperScriptPath);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
index 3525170..762565b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
@@ -369,10 +369,13 @@ public static void main(String[] argv) throws Throwable {
new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
appId, locId, localDirs,
RecordFactoryProvider.getRecordFactory(null));
- System.exit(localizer.runLocalization(nmAddr));
+ int nRet = localizer.runLocalization(nmAddr);
+ LOG.info(String.format("nRet: %d", nRet));
+ System.exit(nRet);
} catch (Throwable e) {
// Print error to stdout so that LCE can use it.
e.printStackTrace(System.out);
+ LOG.error("Exception in main:", e);
throw e;
}
}