diff --git a/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml b/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml index a27419a..1bef3c6 100644 --- a/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml +++ b/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml @@ -80,32 +80,38 @@ + templeton.python + ${env.PYTHON_CMD} + The path to the python executable. + + + templeton.pig.archive - hdfs:///apps/templeton/pig-0.10.1.tar.gz + The path to the Pig archive. templeton.pig.path - pig-0.10.1.tar.gz/pig-0.10.1/bin/pig + pig-0.11.1.tar.gz/pig-0.11.1/bin/pig The path to the Pig executable. templeton.hcat - ${env.HCAT_PREFIX}/bin/hcat + ${env.HCAT_PREFIX}/bin/hcat.py The path to the hcatalog executable. templeton.hive.archive - hdfs:///apps/templeton/hive-0.10.0.tar.gz + The path to the Hive archive. templeton.hive.path - hive-0.10.0.tar.gz/hive-0.10.0/bin/hive + hive-0.11.0.tar.gz/hive-0.11.0/bin/hive The path to the Hive executable. diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 5f5ee54..a22a15d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -87,6 +87,7 @@ public static final String HADOOP_NAME = "templeton.hadoop"; public static final String HADOOP_CONF_DIR = "templeton.hadoop.conf.dir"; public static final String HCAT_NAME = "templeton.hcat"; + public static final String PYTHON_NAME = "templeton.python"; public static final String HIVE_ARCHIVE_NAME = "templeton.hive.archive"; public static final String HIVE_PATH_NAME = "templeton.hive.path"; public static final String HIVE_PROPS_NAME = "templeton.hive.properties"; @@ -181,6 +182,7 @@ private boolean loadOneClasspathConfig(String fname) { public String hadoopQueueName() { return get(HADOOP_QUEUE_NAME); } public String clusterHadoop() { return get(HADOOP_NAME); } public String clusterHcat() { return get(HCAT_NAME); } + public String clusterPython() { return get(PYTHON_NAME); } public String pigPath() { return get(PIG_PATH_NAME); } public String pigArchive() { return get(PIG_ARCHIVE_NAME); } public String hivePath() { return get(HIVE_PATH_NAME); } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java index 77ee6af..a01275f 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java @@ -18,12 +18,18 @@ */ package org.apache.hive.hcatalog.templeton; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintWriter; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Semaphore; import org.apache.commons.exec.CommandLine; @@ -33,6 +39,38 @@ import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.Shell; + +class StreamOutputWriter extends Thread +{ + InputStream is; + String type; + PrintWriter out; + + StreamOutputWriter(InputStream is, String type, OutputStream outStream) + { + this.is = is; + this.type = type; + this.out = new PrintWriter(outStream, true); + } + + @Override + public void run() + { + try + { + BufferedReader br = + new BufferedReader(new InputStreamReader(is)); + String line = null; + while ( (line = br.readLine()) != null){ + out.println(line); + } + } catch (IOException ioe) + { + ioe.printStackTrace(); + } + } +} /** * Execute a local program. This is a singleton service that will @@ -45,6 +83,9 @@ private static volatile ExecServiceImpl theSingleton; + /** Windows CreateProcess synchronization object */ + private static final Object WindowsProcessLaunchLock = new Object(); + /** * Retrieve the singleton. */ @@ -133,7 +174,54 @@ private ExecBean auxRun(String program, List args, Map e LOG.info("Running: " + cmd); ExecBean res = new ExecBean(); - res.exitcode = executor.execute(cmd, execEnv(env)); + + if(Shell.WINDOWS){ + //The default executor is sometimes causing failure on windows. hcat + // command sometimes returns non zero exit status with it. It seems + // to hit some race conditions on windows. + env = execEnv(env); + String[] envVals = new String[env.size()]; + int i=0; + for( Entry kv : env.entrySet()){ + envVals[i++] = kv.getKey() + "=" + kv.getValue(); + LOG.info("Setting " + kv.getKey() + "=" + kv.getValue()); + } + + Process proc; + synchronized (WindowsProcessLaunchLock) { + // To workaround the race condition issue with child processes + // inheriting unintended handles during process launch that can + // lead to hangs on reading output and error streams, we + // serialize process creation. More info available at: + // http://support.microsoft.com/kb/315939 + proc = Runtime.getRuntime().exec(cmd.toStrings(), envVals); + } + + //consume stderr + StreamOutputWriter errorGobbler = new + StreamOutputWriter(proc.getErrorStream(), "ERROR", errStream); + + //consume stdout + StreamOutputWriter outputGobbler = new + StreamOutputWriter(proc.getInputStream(), "OUTPUT", outStream); + + //start collecting input streams + errorGobbler.start(); + outputGobbler.start(); + //execute + try{ + res.exitcode = proc.waitFor(); + } catch (InterruptedException e) { + throw new IOException(e); + } + //flush + errorGobbler.out.flush(); + outputGobbler.out.flush(); + } + else { + res.exitcode = executor.execute(cmd, execEnv(env)); + } + String enc = appConf.get(AppConfig.EXEC_ENCODING_NAME); res.stdout = outStream.toString(enc); res.stderr = errStream.toString(enc); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java index 387cce8..3ea10df 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java @@ -67,7 +67,11 @@ public ExecBean run(String user, String exec, boolean format, Map env = TempletonUtils.hadoopUserEnv(user, cp); proxy.addEnv(env); proxy.addArgs(args); - return execService.run(appConf.clusterHcat(), args, env); + if (appConf.clusterHcat().toLowerCase().endsWith(".py")) { + return execService.run(appConf.clusterPython(), args, env); + } else { + return execService.run(appConf.clusterHcat(), args, env); + } } catch (InterruptedException e) { throw new IOException(e); } finally { @@ -79,8 +83,12 @@ public ExecBean run(String user, String exec, boolean format, private List makeArgs(String exec, boolean format, String group, String permissions) { ArrayList args = new ArrayList(); + if (appConf.clusterHcat().toLowerCase().endsWith(".py")) { + // hcat.py will become the first argument pass to command "python" + args.add(appConf.clusterHcat()); + } args.add("-e"); - args.add(exec); + args.add('"' + exec + '"'); if (TempletonUtils.isset(group)) { args.add("-g"); args.add(group); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index 2e0d5bb..504eb7e 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -64,6 +64,7 @@ public EnqueueBean run(String user, Map userArgs, try { args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, enablelog)); args.add("--"); + TempletonUtils.addCmdForWindows(args); args.add(appConf.hivePath()); args.add("--service"); @@ -122,8 +123,11 @@ public EnqueueBean run(String user, Map userArgs, args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.HIVE)); - args.add("-archives"); - args.add(appConf.hiveArchive()); + if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals("")) + { + args.add("-archives"); + args.add(appConf.hiveArchive()); + } return args; } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 82e16bc..ea097ab 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -68,6 +68,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, jobType)); args.add("--"); + TempletonUtils.addCmdForWindows(args); args.add(appConf.clusterHadoop()); args.add("jar"); args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName()); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 2e011a4..c170bb8 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -70,10 +70,14 @@ public EnqueueBean run(String user, Map userArgs, } args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.PIG)); - args.add("-archives"); - args.add(appConf.pigArchive()); + if (appConf.pigArchive() != null && !appConf.pigArchive().equals("")) + { + args.add("-archives"); + args.add(appConf.pigArchive()); + } args.add("--"); + TempletonUtils.addCmdForWindows(args); args.add(appConf.pigPath()); //the token file location should be first argument of pig args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java index 3511159..1202ffe 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java @@ -98,7 +98,7 @@ public void run() { // cycle fails, it'll try again on the next cycle. try { if (fs == null) { - fs = FileSystem.get(appConf); + fs = new Path(storage_root).getFileSystem(appConf); } checkFiles(fs); } catch (Exception e) { diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java index 14956da..4eb21a3 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java @@ -208,7 +208,7 @@ public boolean delete(Type type, String id) throws NotFoundException { public void openStorage(Configuration config) throws IOException { storage_root = config.get(TempletonStorage.STORAGE_ROOT); if (fs == null) { - fs = FileSystem.get(config); + fs = new Path(storage_root).getFileSystem(config); } } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 7e91951..1a40f2d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -48,12 +48,15 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.templeton.BadParam; import org.apache.hive.hcatalog.templeton.LauncherDelegator; import org.apache.hive.hcatalog.templeton.LogRetriever; import org.apache.hive.hcatalog.templeton.Main; @@ -106,6 +109,9 @@ protected Process startJob(Context context, String user, ArrayList removeEnv = new ArrayList(); removeEnv.add("HADOOP_ROOT_LOGGER"); + removeEnv.add("hadoop-command"); + removeEnv.add("CLASS"); + removeEnv.add("mapredcommand"); Map env = TempletonUtils.hadoopUserEnv(user, overrideClasspath); List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); @@ -114,7 +120,15 @@ protected Process startJob(Context context, String user, if (tokenFile != null) { //Token is available, so replace the placeholder + tokenFile = tokenFile.replaceAll("\"", ""); String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; + if (Shell.WINDOWS) { + try { + tokenArg = TempletonUtils.quoteForWindows(tokenArg); + } catch (BadParam e) { + throw new IOException("cannot pass " + tokenFile + " to mapreduce.job.credentials.binary", e); + } + } for(int i=0; i cmd, List removeEnv, Map environmentVariables) throws IOException { - System.err.println("templeton: starting " + cmd); - System.err.print("With environment variables: "); - for (Map.Entry keyVal : environmentVariables.entrySet()) { - System.err.println(keyVal.getKey() + "=" + keyVal.getValue()); - } + logDebugCmd(cmd, environmentVariables); ProcessBuilder pb = new ProcessBuilder(cmd); for (String key : removeEnv) pb.environment().remove(key); @@ -53,4 +53,20 @@ public Process run(List cmd, List removeEnv, return pb.start(); } + private void logDebugCmd(List cmd, + Map environmentVariables) { + if(!LOG.isDebugEnabled()){ + return; + } + LOG.debug("starting " + cmd); + LOG.debug("With environment variables: " ); + for(Map.Entry keyVal : environmentVariables.entrySet()){ + LOG.debug(keyVal.getKey() + "=" + keyVal.getValue()); + } + LOG.debug("With environment variables already set: " ); + Map env = System.getenv(); + for (String envName : env.keySet()) { + LOG.debug(envName + "=" + env.get(envName)); + } + } } diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java index 2f89ad4..cf02c76 100644 --- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java @@ -31,7 +31,7 @@ MockServer server; public void setUp() { - new Main(null); // Initialize the config + new Main(new String[]{}); // Initialize the config server = new MockServer(); } diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java index 5f536d6..c97a90d 100644 --- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java @@ -159,6 +159,12 @@ public void testHadoopFsPath() { @Test public void testHadoopFsFilename() { try { + String tmpFileName1 = "/tmp/testHadoopFsListAsArray1"; + String tmpFileName2 = "/tmp/testHadoopFsListAsArray2"; + File tmpFile1 = new File(tmpFileName1); + File tmpFile2 = new File(tmpFileName2); + tmpFile1.createNewFile(); + tmpFile2.createNewFile(); Assert.assertEquals(null, TempletonUtils.hadoopFsFilename(null, null, null)); Assert.assertEquals(null, TempletonUtils.hadoopFsFilename(tmpFile.toURI().toString(), null, null)); @@ -188,14 +194,22 @@ public void testHadoopFsFilename() { @Test public void testHadoopFsListAsArray() { try { + String tmpFileName1 = "/tmp/testHadoopFsListAsArray1"; + String tmpFileName2 = "/tmp/testHadoopFsListAsArray2"; + File tmpFile1 = new File(tmpFileName1); + File tmpFile2 = new File(tmpFileName2); + tmpFile1.createNewFile(); + tmpFile2.createNewFile(); Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(null, null, null) == null); - Assert.assertTrue(TempletonUtils.hadoopFsListAsArray( - tmpFile.toURI().toString() + "," + usrFile.toString(), null, null) == null); - String[] tmp2 = TempletonUtils.hadoopFsListAsArray( - tmpFile.toURI().toString() + "," + usrFile.toURI().toString(), - new Configuration(), null); - Assert.assertEquals(tmpFile.toURI().toString(), tmp2[0]); - Assert.assertEquals(usrFile.toURI().toString(), tmp2[1]); + Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(tmpFileName1 + "," + tmpFileName2, + null, null) == null); + String[] tmp2 + = TempletonUtils.hadoopFsListAsArray(tmpFileName1 + "," + tmpFileName2, + new Configuration(), null); + Assert.assertEquals("file:" + tmpFileName1, tmp2[0]); + Assert.assertEquals("file:" + tmpFileName2, tmp2[1]); + tmpFile1.delete(); + tmpFile2.delete(); } catch (FileNotFoundException e) { Assert.fail("Couldn't find name for " + tmpFile.toURI().toString()); } catch (Exception e) { @@ -218,15 +232,18 @@ public void testHadoopFsListAsArray() { @Test public void testHadoopFsListAsString() { try { + String tmpFileName1 = "/tmp/testHadoopFsListAsString1"; + String tmpFileName2 = "/tmp/testHadoopFsListAsString2"; + File tmpFile1 = new File(tmpFileName1); + File tmpFile2 = new File(tmpFileName2); + tmpFile1.createNewFile(); + tmpFile2.createNewFile(); Assert.assertTrue(TempletonUtils.hadoopFsListAsString(null, null, null) == null); - Assert.assertTrue(TempletonUtils.hadoopFsListAsString( - tmpFile.toURI().toString() + "," + usrFile.toURI().toString(), + Assert.assertTrue(TempletonUtils.hadoopFsListAsString("/tmp,/usr", null, null) == null); - Assert.assertEquals( - tmpFile.toURI().toString() + "," + usrFile.toURI().toString(), - TempletonUtils.hadoopFsListAsString( - tmpFile.toURI().toString() + "," + usrFile.toURI().toString(), - new Configuration(), null)); + Assert.assertEquals("file:" + tmpFileName1 + ",file:" + tmpFileName2, + TempletonUtils.hadoopFsListAsString + (tmpFileName1 + "," + tmpFileName2, new Configuration(), null)); } catch (FileNotFoundException e) { Assert.fail("Couldn't find name for " + tmpFile.toURI().toString()); } catch (Exception e) {