diff --git hcatalog/webhcat/svr/src/main/config/webhcat-default.xml hcatalog/webhcat/svr/src/main/config/webhcat-default.xml index 1bef3c6..d369d5d 100644 --- hcatalog/webhcat/svr/src/main/config/webhcat-default.xml +++ hcatalog/webhcat/svr/src/main/config/webhcat-default.xml @@ -116,6 +116,25 @@ + templeton.hive.home + hive-0.13.0-SNAPSHOT-bin.tar.gz/hive-0.13.0-SNAPSHOT-bin + + The path to the Hive home within the tar. This is needed if Hive is not installed on all + nodes in the cluster and needs to be shipped to the target node in the cluster to execute Pig + job which uses HCat, Hive query, etc. Has no effect if templeton.hive.archive is not set. + + + + templeton.hcat.home + hive-0.13.0-SNAPSHOT-bin.tar.gz/hive-0.13.0-SNAPSHOT-bin/hcatalog + + The path to the HCat home within the tar. This is needed if Hive is not installed on all + nodes in the cluster and needs to be shipped to the target node in the cluster to execute Pig + job which uses HCat, Hive query, etc. Has no effect if templeton.hive.archive is not set. + + + + templeton.hive.properties hive.metastore.local=false,hive.metastore.uris=thrift://localhost:9933,hive.metastore.sasl.enabled=false Properties to set when running hive. diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 01dff5b..10fc6df 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -91,6 +91,14 @@ public static final String PYTHON_NAME = "templeton.python"; public static final String HIVE_ARCHIVE_NAME = "templeton.hive.archive"; public static final String HIVE_PATH_NAME = "templeton.hive.path"; + /** + * see webhcat-default.xml + */ + public static final String HIVE_HOME_PATH = "templeton.hive.home"; + /** + * see webhcat-default.xml + */ + public static final String HCAT_HOME_PATH = "templeton.hcat.home"; public static final String HIVE_PROPS_NAME = "templeton.hive.properties"; public static final String LIB_JARS_NAME = "templeton.libjars"; public static final String PIG_ARCHIVE_NAME = "templeton.pig.archive"; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index c616d6c..a9e13c6 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -128,7 +128,7 @@ public EnqueueBean run(String user, Map userArgs, if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals("")) { - args.add("-archives"); + args.add(ARCHIVES); args.add(appConf.hiveArchive()); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index bea08bb..0d51bc7 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -43,14 +43,14 @@ public EnqueueBean run(String user, Map userArgs, String jar, St String libjars, String files, List jarArgs, List defines, String statusdir, String callback, - boolean usehcatalog, String completedUrl, + boolean usesHcatalog, String completedUrl, boolean enablelog, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, - statusdir, usehcatalog, completedUrl, enablelog, jobType); + statusdir, usesHcatalog, completedUrl, enablelog, jobType); return enqueueController(user, userArgs, callback, args); } @@ -58,7 +58,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St private List makeArgs(String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, boolean usehcatalog, String completedUrl, + String statusdir, boolean usesHcatalog, String completedUrl, boolean enablelog, JobType jobType) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); @@ -72,7 +72,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St TempletonUtils.addCmdForWindows(args); //check if the rest command specified explicitly to use hcatalog - if(usehcatalog){ + if(usesHcatalog){ addHiveMetaStoreTokenArg(); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index b8b5973..854aa99 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -97,6 +97,9 @@ public EnqueueBean enqueueController(String user, Map userArgs, private String queueAsUser(UserGroupInformation ugi, final List args) throws IOException, InterruptedException { + if(LOG.isDebugEnabled()) { + LOG.debug("Launching job: " + args); + } return ugi.doAs(new PrivilegedExceptionAction() { public String run() throws Exception { String[] array = new String[args.size()]; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java index 0f37278..c52ea77 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java @@ -202,10 +202,13 @@ public Server runServer(int port) public FilterHolder makeAuthFilter() { FilterHolder authFilter = new FilterHolder(AuthFilter.class); if (UserGroupInformation.isSecurityEnabled()) { + //http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/security/authentication/server/AuthenticationFilter.html authFilter.setInitParameter("dfs.web.authentication.signature.secret", conf.kerberosSecret()); + //http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.2/bk_installing_manually_book/content/rpm-chap14-2-3-2.html authFilter.setInitParameter("dfs.web.authentication.kerberos.principal", conf.kerberosPrincipal()); + //http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.2/bk_installing_manually_book/content/rpm-chap14-2-3-2.html authFilter.setInitParameter("dfs.web.authentication.kerberos.keytab", conf.kerberosKeytab()); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 9adb1d9..64a68bd 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -29,6 +29,7 @@ import org.apache.commons.exec.ExecuteException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hive.hcatalog.templeton.tool.JobSubmissionConstants; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; @@ -47,13 +48,13 @@ public EnqueueBean run(String user, Map userArgs, String execute, String srcFile, List pigArgs, String otherFiles, String statusdir, String callback, - boolean usehcatalog, String completedUrl, boolean enablelog) + boolean useSHcatalog, String completedUrl, boolean enablelog) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, - otherFiles, statusdir, usehcatalog, completedUrl, enablelog); + otherFiles, statusdir, useSHcatalog, completedUrl, enablelog); return enqueueController(user, userArgs, callback, args); } @@ -64,20 +65,23 @@ public EnqueueBean run(String user, Map userArgs, * @param pigArgs pig command line arguments * @param otherFiles files to be copied to the map reduce cluster * @param statusdir status dir location - * @param usehcatalog whether the command uses hcatalog/needs to connect + * @param usesHcatalog whether the command uses hcatalog/needs to connect * to hive metastore server * @param completedUrl call back url - * @return * @throws BadParam * @throws IOException * @throws InterruptedException */ private List makeArgs(String execute, String srcFile, List pigArgs, String otherFiles, - String statusdir, boolean usehcatalog, - String completedUrl, boolean enablelog) - throws BadParam, IOException, InterruptedException { + String statusdir, boolean usesHcatalog, + String completedUrl, boolean enablelog) throws BadParam, IOException, + InterruptedException { ArrayList args = new ArrayList(); + //check if the REST command specified explicitly to use hcatalog + // or if it says that implicitly using the pig -useHCatalog arg + boolean needsMetastoreAccess = usesHcatalog || hasPigArgUseHcat(pigArgs); + try { ArrayList allFiles = new ArrayList(); if (TempletonUtils.isset(srcFile)) { @@ -89,12 +93,32 @@ public EnqueueBean run(String user, Map userArgs, } args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.PIG)); - if (appConf.pigArchive() != null && !appConf.pigArchive().equals("")) - { - args.add("-archives"); - args.add(appConf.pigArchive()); + boolean shipPigTar = appConf.pigArchive() != null && !appConf.pigArchive().equals(""); + boolean shipHiveTar = needsMetastoreAccess && appConf.hiveArchive() != null + && !appConf.hiveArchive().equals(""); + if(shipPigTar || shipHiveTar) { + args.add(ARCHIVES); + StringBuilder archives = new StringBuilder(); + if(shipPigTar) { + archives.append(appConf.pigArchive()); + } + if(shipPigTar && shipHiveTar) { + archives.append(","); + } + if(shipHiveTar) { + archives.append(appConf.hiveArchive()); + } + args.add(archives.toString()); + } + if(shipHiveTar) { + addDef(args, JobSubmissionConstants.PigConstants.HIVE_HOME, + appConf.get(AppConfig.HIVE_HOME_PATH)); + addDef(args, JobSubmissionConstants.PigConstants.HCAT_HOME, + appConf.get(AppConfig.HCAT_HOME_PATH)); + //Pig which uses HCat will pass this to HCat so that it can find the metastore + addDef(args, JobSubmissionConstants.PigConstants.PIG_OPTS, + appConf.get(AppConfig.HIVE_PROPS_NAME)); } - args.add("--"); TempletonUtils.addCmdForWindows(args); args.add(appConf.pigPath()); @@ -104,9 +128,7 @@ public EnqueueBean run(String user, Map userArgs, for (String pigArg : pigArgs) { args.add(TempletonUtils.quoteForWindows(pigArg)); } - //check if the REST command specified explicitly to use hcatalog - // or if it says that implicitly using the pig -useHCatalog arg - if(usehcatalog || hasPigArgUseHcat(pigArgs)){ + if(needsMetastoreAccess) { addHiveMetaStoreTokenArg(); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index f41450c..665e5f9 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -635,7 +635,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, /** * Run a MapReduce Jar job. * Params correspond to the REST api params - * @param usehcatalog if {@code true}, means the Jar uses HCat and thus needs to access + * @param usesHcatalog if {@code true}, means the Jar uses HCat and thus needs to access * metastore, which requires additional steps for WebHCat to perform in a secure cluster. * @param callback URL which WebHCat will call when the hive job finishes * @see org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob @@ -651,7 +651,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("define") List defines, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, - @FormParam("usehcatalog") boolean usehcatalog, + @FormParam("usehcatalog") boolean usesHcatalog, @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { @@ -677,14 +677,14 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, return d.run(getDoAsUser(), userArgs, jar, mainClass, libjars, files, args, defines, - statusdir, callback, usehcatalog, getCompletedUrl(), enablelog, JobType.JAR); + statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, JobType.JAR); } /** * Run a Pig job. - * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usehcatalog}, + * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usesHcatalog}, * is interpreted as true. - * @param usehcatalog if {@code true}, means the Pig script uses HCat and thus needs to access + * @param usesHcatalog if {@code true}, means the Pig script uses HCat and thus needs to access * metastore, which requires additional steps for WebHCat to perform in a secure cluster. * This does nothing to ensure that Pig is installed on target node in the cluster. * @param callback URL which WebHCat will call when the hive job finishes @@ -699,7 +699,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("files") String otherFiles, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, - @FormParam("usehcatalog") boolean usehcatalog, + @FormParam("usehcatalog") boolean usesHcatalog, @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { @@ -725,7 +725,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, return d.run(getDoAsUser(), userArgs, execute, srcFile, pigArgs, otherFiles, - statusdir, callback, usehcatalog, getCompletedUrl(), enablelog); + statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog); } /** diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java index 532a191..cd20c26 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java @@ -24,6 +24,11 @@ * or hive. */ public class TempletonDelegator { + /** + * http://hadoop.apache.org/docs/r1.0.4/commands_manual.html#Generic+Options + */ + public static final String ARCHIVES = "-archives"; + protected AppConfig appConf; public TempletonDelegator(AppConfig appConf) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java index 1daea39..482e993 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -34,4 +34,12 @@ public static final int WATCHER_TIMEOUT_SECS = 10; public static final int KEEP_ALIVE_MSEC = 60 * 1000; public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__"; + /** + * constants needed for Pig job submission + */ + public static interface PigConstants { + public static final String HIVE_HOME = "HIVE_HOME"; + public static final String HCAT_HOME = "HCAT_HOME"; + public static final String PIG_OPTS = "PIG_OPTS"; + } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java index 716c2e3..db8928a 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -33,6 +33,7 @@ import org.apache.hive.hcatalog.templeton.LauncherDelegator; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -65,8 +66,25 @@ * it will end up in 'syslog' of this Map task. For example, look for KeepAlive heartbeat msgs. */ private static final Log LOG = LogFactory.getLog(LaunchMapper.class); - - + /** + * When a Pig job is submitted and it uses HCat, WebHCat may be configured to ship hive tar + * to the target node. Pig on the target node needs some env vars configured. + */ + private static void handlePigEnvVars(Configuration conf, Map env) { + if(conf.get(PigConstants.HIVE_HOME) != null) { + env.put(PigConstants.HIVE_HOME, new File(conf.get(PigConstants.HIVE_HOME)).getAbsolutePath()); + } + if(conf.get(PigConstants.HCAT_HOME) != null) { + env.put(PigConstants.HCAT_HOME, new File(conf.get(PigConstants.HCAT_HOME)).getAbsolutePath()); + } + if(conf.get(PigConstants.PIG_OPTS) != null) { + StringBuilder pigOpts = new StringBuilder(); + for(String prop : conf.get(PigConstants.PIG_OPTS).split(",")) { + pigOpts.append("-D").append(prop).append(" "); + } + env.put(PigConstants.PIG_OPTS, pigOpts.toString()); + } + } protected Process startJob(Context context, String user, String overrideClasspath) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); @@ -79,8 +97,8 @@ protected Process startJob(Context context, String user, String overrideClasspat removeEnv.add("hadoop-command"); removeEnv.add("CLASS"); removeEnv.add("mapredcommand"); - Map env = TempletonUtils.hadoopUserEnv(user, - overrideClasspath); + Map env = TempletonUtils.hadoopUserEnv(user, overrideClasspath); + handlePigEnvVars(conf, env); List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 802dfa2..c1dedce 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -180,6 +181,9 @@ public String getSubmittedId() { @Override public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TException { + if(LOG.isDebugEnabled()) { + LOG.debug("Preparing to submit job: " + Arrays.toString(args)); + } Configuration conf = getConf(); conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));