diff --git a/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml b/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
index a27419a..1bef3c6 100644
--- a/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
+++ b/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
@@ -80,32 +80,38 @@
+ templeton.python
+ ${env.PYTHON_CMD}
+ The path to the python executable.
+
+
+
templeton.pig.archive
- hdfs:///apps/templeton/pig-0.10.1.tar.gz
+
The path to the Pig archive.
templeton.pig.path
- pig-0.10.1.tar.gz/pig-0.10.1/bin/pig
+ pig-0.11.1.tar.gz/pig-0.11.1/bin/pig
The path to the Pig executable.
templeton.hcat
- ${env.HCAT_PREFIX}/bin/hcat
+ ${env.HCAT_PREFIX}/bin/hcat.py
The path to the hcatalog executable.
templeton.hive.archive
- hdfs:///apps/templeton/hive-0.10.0.tar.gz
+
The path to the Hive archive.
templeton.hive.path
- hive-0.10.0.tar.gz/hive-0.10.0/bin/hive
+ hive-0.11.0.tar.gz/hive-0.11.0/bin/hive
The path to the Hive executable.
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 5f5ee54..a22a15d 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -87,6 +87,7 @@
public static final String HADOOP_NAME = "templeton.hadoop";
public static final String HADOOP_CONF_DIR = "templeton.hadoop.conf.dir";
public static final String HCAT_NAME = "templeton.hcat";
+ public static final String PYTHON_NAME = "templeton.python";
public static final String HIVE_ARCHIVE_NAME = "templeton.hive.archive";
public static final String HIVE_PATH_NAME = "templeton.hive.path";
public static final String HIVE_PROPS_NAME = "templeton.hive.properties";
@@ -181,6 +182,7 @@ private boolean loadOneClasspathConfig(String fname) {
public String hadoopQueueName() { return get(HADOOP_QUEUE_NAME); }
public String clusterHadoop() { return get(HADOOP_NAME); }
public String clusterHcat() { return get(HCAT_NAME); }
+ public String clusterPython() { return get(PYTHON_NAME); }
public String pigPath() { return get(PIG_PATH_NAME); }
public String pigArchive() { return get(PIG_ARCHIVE_NAME); }
public String hivePath() { return get(HIVE_PATH_NAME); }
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java
index 77ee6af..a01275f 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java
@@ -18,12 +18,18 @@
*/
package org.apache.hive.hcatalog.templeton;
+import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import org.apache.commons.exec.CommandLine;
@@ -33,6 +39,38 @@
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.Shell;
+
+class StreamOutputWriter extends Thread
+{
+ InputStream is;
+ String type;
+ PrintWriter out;
+
+ StreamOutputWriter(InputStream is, String type, OutputStream outStream)
+ {
+ this.is = is;
+ this.type = type;
+ this.out = new PrintWriter(outStream, true);
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ BufferedReader br =
+ new BufferedReader(new InputStreamReader(is));
+ String line = null;
+ while ( (line = br.readLine()) != null){
+ out.println(line);
+ }
+ } catch (IOException ioe)
+ {
+ ioe.printStackTrace();
+ }
+ }
+}
/**
* Execute a local program. This is a singleton service that will
@@ -45,6 +83,9 @@
private static volatile ExecServiceImpl theSingleton;
+ /** Windows CreateProcess synchronization object */
+ private static final Object WindowsProcessLaunchLock = new Object();
+
/**
* Retrieve the singleton.
*/
@@ -133,7 +174,54 @@ private ExecBean auxRun(String program, List args, Map e
LOG.info("Running: " + cmd);
ExecBean res = new ExecBean();
- res.exitcode = executor.execute(cmd, execEnv(env));
+
+ if(Shell.WINDOWS){
+ //The default executor is sometimes causing failure on windows. hcat
+ // command sometimes returns non zero exit status with it. It seems
+ // to hit some race conditions on windows.
+ env = execEnv(env);
+ String[] envVals = new String[env.size()];
+ int i=0;
+ for( Entry kv : env.entrySet()){
+ envVals[i++] = kv.getKey() + "=" + kv.getValue();
+ LOG.info("Setting " + kv.getKey() + "=" + kv.getValue());
+ }
+
+ Process proc;
+ synchronized (WindowsProcessLaunchLock) {
+ // To workaround the race condition issue with child processes
+ // inheriting unintended handles during process launch that can
+ // lead to hangs on reading output and error streams, we
+ // serialize process creation. More info available at:
+ // http://support.microsoft.com/kb/315939
+ proc = Runtime.getRuntime().exec(cmd.toStrings(), envVals);
+ }
+
+ //consume stderr
+ StreamOutputWriter errorGobbler = new
+ StreamOutputWriter(proc.getErrorStream(), "ERROR", errStream);
+
+ //consume stdout
+ StreamOutputWriter outputGobbler = new
+ StreamOutputWriter(proc.getInputStream(), "OUTPUT", outStream);
+
+ //start collecting input streams
+ errorGobbler.start();
+ outputGobbler.start();
+ //execute
+ try{
+ res.exitcode = proc.waitFor();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ //flush
+ errorGobbler.out.flush();
+ outputGobbler.out.flush();
+ }
+ else {
+ res.exitcode = executor.execute(cmd, execEnv(env));
+ }
+
String enc = appConf.get(AppConfig.EXEC_ENCODING_NAME);
res.stdout = outStream.toString(enc);
res.stderr = errStream.toString(enc);
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java
index 387cce8..3ea10df 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java
@@ -67,7 +67,11 @@ public ExecBean run(String user, String exec, boolean format,
Map env = TempletonUtils.hadoopUserEnv(user, cp);
proxy.addEnv(env);
proxy.addArgs(args);
- return execService.run(appConf.clusterHcat(), args, env);
+ if (appConf.clusterHcat().toLowerCase().endsWith(".py")) {
+ return execService.run(appConf.clusterPython(), args, env);
+ } else {
+ return execService.run(appConf.clusterHcat(), args, env);
+ }
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
@@ -79,8 +83,12 @@ public ExecBean run(String user, String exec, boolean format,
private List makeArgs(String exec, boolean format,
String group, String permissions) {
ArrayList args = new ArrayList();
+ if (appConf.clusterHcat().toLowerCase().endsWith(".py")) {
+ // hcat.py will become the first argument pass to command "python"
+ args.add(appConf.clusterHcat());
+ }
args.add("-e");
- args.add(exec);
+ args.add('"' + exec + '"');
if (TempletonUtils.isset(group)) {
args.add("-g");
args.add(group);
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index 2e0d5bb..504eb7e 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -64,6 +64,7 @@ public EnqueueBean run(String user, Map userArgs,
try {
args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, enablelog));
args.add("--");
+ TempletonUtils.addCmdForWindows(args);
args.add(appConf.hivePath());
args.add("--service");
@@ -122,8 +123,11 @@ public EnqueueBean run(String user, Map userArgs,
args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles,
enablelog, JobType.HIVE));
- args.add("-archives");
- args.add(appConf.hiveArchive());
+ if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals(""))
+ {
+ args.add("-archives");
+ args.add(appConf.hiveArchive());
+ }
return args;
}
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index 82e16bc..ea097ab 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -68,6 +68,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St
args.addAll(makeLauncherArgs(appConf, statusdir,
completedUrl, allFiles, enablelog, jobType));
args.add("--");
+ TempletonUtils.addCmdForWindows(args);
args.add(appConf.clusterHadoop());
args.add("jar");
args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName());
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
index 2e011a4..c170bb8 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
@@ -70,10 +70,14 @@ public EnqueueBean run(String user, Map userArgs,
}
args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.PIG));
- args.add("-archives");
- args.add(appConf.pigArchive());
+ if (appConf.pigArchive() != null && !appConf.pigArchive().equals(""))
+ {
+ args.add("-archives");
+ args.add(appConf.pigArchive());
+ }
args.add("--");
+ TempletonUtils.addCmdForWindows(args);
args.add(appConf.pigPath());
//the token file location should be first argument of pig
args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER);
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java
index 3511159..1202ffe 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java
@@ -98,7 +98,7 @@ public void run() {
// cycle fails, it'll try again on the next cycle.
try {
if (fs == null) {
- fs = FileSystem.get(appConf);
+ fs = new Path(storage_root).getFileSystem(appConf);
}
checkFiles(fs);
} catch (Exception e) {
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
index 14956da..4eb21a3 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
@@ -208,7 +208,7 @@ public boolean delete(Type type, String id) throws NotFoundException {
public void openStorage(Configuration config) throws IOException {
storage_root = config.get(TempletonStorage.STORAGE_ROOT);
if (fs == null) {
- fs = FileSystem.get(config);
+ fs = new Path(storage_root).getFileSystem(config);
}
}
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
index 7e91951..1a40f2d 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
@@ -48,12 +48,15 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.templeton.BadParam;
import org.apache.hive.hcatalog.templeton.LauncherDelegator;
import org.apache.hive.hcatalog.templeton.LogRetriever;
import org.apache.hive.hcatalog.templeton.Main;
@@ -106,6 +109,9 @@ protected Process startJob(Context context, String user,
ArrayList removeEnv = new ArrayList();
removeEnv.add("HADOOP_ROOT_LOGGER");
+ removeEnv.add("hadoop-command");
+ removeEnv.add("CLASS");
+ removeEnv.add("mapredcommand");
Map env = TempletonUtils.hadoopUserEnv(user,
overrideClasspath);
List jarArgsList = new LinkedList(Arrays.asList(jarArgs));
@@ -114,7 +120,15 @@ protected Process startJob(Context context, String user,
if (tokenFile != null) {
//Token is available, so replace the placeholder
+ tokenFile = tokenFile.replaceAll("\"", "");
String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
+ if (Shell.WINDOWS) {
+ try {
+ tokenArg = TempletonUtils.quoteForWindows(tokenArg);
+ } catch (BadParam e) {
+ throw new IOException("cannot pass " + tokenFile + " to mapreduce.job.credentials.binary", e);
+ }
+ }
for(int i=0; i cmd, List removeEnv,
Map environmentVariables)
throws IOException {
- System.err.println("templeton: starting " + cmd);
- System.err.print("With environment variables: ");
- for (Map.Entry keyVal : environmentVariables.entrySet()) {
- System.err.println(keyVal.getKey() + "=" + keyVal.getValue());
- }
+ logDebugCmd(cmd, environmentVariables);
ProcessBuilder pb = new ProcessBuilder(cmd);
for (String key : removeEnv)
pb.environment().remove(key);
@@ -53,4 +53,20 @@ public Process run(List cmd, List removeEnv,
return pb.start();
}
+ private void logDebugCmd(List cmd,
+ Map environmentVariables) {
+ if(!LOG.isDebugEnabled()){
+ return;
+ }
+ LOG.debug("starting " + cmd);
+ LOG.debug("With environment variables: " );
+ for(Map.Entry keyVal : environmentVariables.entrySet()){
+ LOG.debug(keyVal.getKey() + "=" + keyVal.getValue());
+ }
+ LOG.debug("With environment variables already set: " );
+ Map env = System.getenv();
+ for (String envName : env.keySet()) {
+ LOG.debug(envName + "=" + env.get(envName));
+ }
+ }
}
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java
index 2f89ad4..cf02c76 100644
--- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java
@@ -31,7 +31,7 @@
MockServer server;
public void setUp() {
- new Main(null); // Initialize the config
+ new Main(new String[]{}); // Initialize the config
server = new MockServer();
}
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
index 5f536d6..c97a90d 100644
--- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
@@ -159,6 +159,12 @@ public void testHadoopFsPath() {
@Test
public void testHadoopFsFilename() {
try {
+ String tmpFileName1 = "/tmp/testHadoopFsListAsArray1";
+ String tmpFileName2 = "/tmp/testHadoopFsListAsArray2";
+ File tmpFile1 = new File(tmpFileName1);
+ File tmpFile2 = new File(tmpFileName2);
+ tmpFile1.createNewFile();
+ tmpFile2.createNewFile();
Assert.assertEquals(null, TempletonUtils.hadoopFsFilename(null, null, null));
Assert.assertEquals(null,
TempletonUtils.hadoopFsFilename(tmpFile.toURI().toString(), null, null));
@@ -188,14 +194,22 @@ public void testHadoopFsFilename() {
@Test
public void testHadoopFsListAsArray() {
try {
+ String tmpFileName1 = "/tmp/testHadoopFsListAsArray1";
+ String tmpFileName2 = "/tmp/testHadoopFsListAsArray2";
+ File tmpFile1 = new File(tmpFileName1);
+ File tmpFile2 = new File(tmpFileName2);
+ tmpFile1.createNewFile();
+ tmpFile2.createNewFile();
Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(null, null, null) == null);
- Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(
- tmpFile.toURI().toString() + "," + usrFile.toString(), null, null) == null);
- String[] tmp2 = TempletonUtils.hadoopFsListAsArray(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
- new Configuration(), null);
- Assert.assertEquals(tmpFile.toURI().toString(), tmp2[0]);
- Assert.assertEquals(usrFile.toURI().toString(), tmp2[1]);
+ Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(tmpFileName1 + "," + tmpFileName2,
+ null, null) == null);
+ String[] tmp2
+ = TempletonUtils.hadoopFsListAsArray(tmpFileName1 + "," + tmpFileName2,
+ new Configuration(), null);
+ Assert.assertEquals("file:" + tmpFileName1, tmp2[0]);
+ Assert.assertEquals("file:" + tmpFileName2, tmp2[1]);
+ tmpFile1.delete();
+ tmpFile2.delete();
} catch (FileNotFoundException e) {
Assert.fail("Couldn't find name for " + tmpFile.toURI().toString());
} catch (Exception e) {
@@ -218,15 +232,18 @@ public void testHadoopFsListAsArray() {
@Test
public void testHadoopFsListAsString() {
try {
+ String tmpFileName1 = "/tmp/testHadoopFsListAsString1";
+ String tmpFileName2 = "/tmp/testHadoopFsListAsString2";
+ File tmpFile1 = new File(tmpFileName1);
+ File tmpFile2 = new File(tmpFileName2);
+ tmpFile1.createNewFile();
+ tmpFile2.createNewFile();
Assert.assertTrue(TempletonUtils.hadoopFsListAsString(null, null, null) == null);
- Assert.assertTrue(TempletonUtils.hadoopFsListAsString(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
+ Assert.assertTrue(TempletonUtils.hadoopFsListAsString("/tmp,/usr",
null, null) == null);
- Assert.assertEquals(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
- TempletonUtils.hadoopFsListAsString(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
- new Configuration(), null));
+ Assert.assertEquals("file:" + tmpFileName1 + ",file:" + tmpFileName2,
+ TempletonUtils.hadoopFsListAsString
+ (tmpFileName1 + "," + tmpFileName2, new Configuration(), null));
} catch (FileNotFoundException e) {
Assert.fail("Couldn't find name for " + tmpFile.toURI().toString());
} catch (Exception e) {